C++并发编程

C++并发编程,第1张

第1章:C++的并发世界 1.1 何谓并发

最简单和最基本的并发,是指两个或更多独立的活动同时发生。


并发在生活中随处可见,我们可以一边走路一边说话,也可以两只手同时作不同的动作,还有我们每个人都过着相互独立的生活——当我在游泳的时候,你可以看球赛,等等。


1.1.1 计算机系统中的并发

计算机领域的并发指的是在单个系统里同时执行多个独立的任务,而非顺序的进行一些活动。


计算机领域里,并发不是一个新事物:很多年前,一台计算机就能通过多任务 *** 作系统的切换功能,同时运行多个应用程序;高端多处理器服务器在很早就已经实现了真正的并行计算。


那“老东西”上有哪些“新东西”能让它在计算机领域越来越流行呢?——真正任务并行,而非一种错觉。


以前,大多数计算机只有一个处理器,具有单个处理单元(processing unit)或核心(core),如今还有很多这样的台式机。


这种机器只能在某一时刻执行一个任务,不过它可以每秒进行多次任务切换。


通过“这个任务做一会,再切换到别的任务,再做一会儿”的方式,让任务看起来是并行执行的。


这种方式称为任务切换


如今,我们仍然将这样的系统称为并发:因为任务切换得太快,以至于无法感觉到任务在何时会被暂时挂起,而切换到另一个任务。


任务切换会给用户和应用程序造成一种“并发的假象”。


因为这种假象,当应用在任务切换的环境下和真正并发环境下执行相比,行为还是有着微妙的不同。


特别是对内存模型不正确的假设(详见第5章),在多线程环境中可能不会出现(详见第10章)。


多处理器计算机用于服务器和高性能计算已有多年。


基于单芯多核处理器(多核处理器)的台式机,也越来越大众化。


无论拥有几个处理器,这些机器都能够真正的并行多个任务。


我们称其为硬件并发(hardware concurrency)”。


图1.1显示了一个计算机处理恰好两个任务时的理想情景,每个任务被分为10个相等大小的块。


在一个双核机器(具有两个处理核心)上,每个任务可以在各自的处理核心上执行。


在单核机器上做任务切换时,每个任务的块交织进行。


但它们中间有一小段分隔(图中所示灰色分隔条的厚度大于双核机器的分隔条);为了实现交织进行,系统每次从一个任务切换到另一个时都需要切换一次上下文(context switch),任务切换也有时间开销。


进行上下文的切换时, *** 作系统必须为当前运行的任务保存CPU的状态和指令指针,并计算出要切换到哪个任务,并为即将切换到的任务重新加载处理器状态。


然后,CPU可能要将新任务的指令和数据的内存载入到缓存中,这会阻止CPU执行任何指令,从而造成的更多的延迟。


1.1.2 并发的途径

试想当两个程序员在两个独立的办公室一起做一个软件项目,他们可以安静地工作、不互相干扰,并且他们人手一套参考手册。


但是,他们沟通起来就有些困难,比起可以直接互相交谈,他们必须使用电话、电子邮件或到对方的办公室进行直接交流。


并且,管理两个办公室需要有一定的经费支出,还需要购买多份参考手册。


假设,让开发人员同在一间办公室办公,他们可以自由的对某个应用程序设计进行讨论,也可以在纸或白板上轻易的绘制图表,对设计观点进行辅助性阐释。


现在,你只需要管理一个办公室,只要有一套参考资料就够了。


遗憾的是,开发人员可能难以集中注意力,并且还可能存在资源共享的问题(比如,“参考手册哪去了?”)

以上两种方法,描绘了并发的两种基本途径。


每个开发人员代表一个线程,每个办公室代表一个进程。


第一种途径是每个进程只要一个线程,这就类似让每个开发人员拥有自己的办公室,而第二种途径是每个进程有多个线程,如同一个办公室里有两个开发人员。


让我们在一个应用程序中简单的分析一下这两种途径。


多进程并发

使用并发的第一种方法,是将应用程序分为多个独立的进程,它们在同一时刻运行,就像同时进行网页浏览和文字处理一样。


如图1.3所示,独立的进程可以通过进程间常规的通信渠道传递讯息(信号、套接字、文件、管道等等)。


不过,这种进程之间的通信通常不是设置复杂,就是速度慢,这是因为 *** 作系统会在进程间提供了一定的保护措施,以避免一个进程去修改另一个进程的数据。


还有一个缺点是,运行多个进程所需的固定开销:需要时间启动进程, *** 作系统需要内部资源来管理进程,等等。


当然,以上的机制也不是一无是处: *** 作系统在进程间提供附加的保护 *** 作和更高级别的通信机制,意味着可以更容易编写安全的并发代码。


实际上,在类似于Erlang的编程环境中,将进程作为并发的基本构造块。


使用多进程实现并发还有一个额外的优势———可以使用远程连接(可能需要联网)的方式,在不同的机器上运行独立的进程。


虽然,这增加了通信成本,但在设计精良的系统上,这可能是一个提高并行可用行和性能的低成本方式。


图 1.3 一对并发运行的进程之间的通信

多线程并发

并发的另一个途径,在单个进程中运行多个线程。


线程很像轻量级的进程:每个线程相互独立运行,且线程可以在不同的指令序列中运行。


但是,进程中的所有线程都共享地址空间,并且所有线程访问到大部分数据———全局变量仍然是全局的,指针、对象的引用或数据可以在线程之间传递。


虽然,进程之间通常共享内存,但是这种共享通常是难以建立和管理的。


因为,同一数据的内存地址在不同的进程中是不相同。


图1.4展示了一个进程中的两个线程通过共享内存进行通信。


图 1.4 同一进程中的一对并发运行的线程之间的通信

地址空间共享,以及缺少线程间数据的保护,使得 *** 作系统的记录工作量减小,所以使用多线程相关的开销远远小于使用多个进程。


不过,共享内存的灵活性是有代价的:如果数据要被多个线程访问,那么程序员必须确保每个线程所访问到的数据是一致的(在本书第3、4、5和8章中会涉及,线程间数据共享可能会遇到的问题,以及如何使用工具来避免这些问题)。


问题并非无解,只要在编写代码时适当地注意即可,这同样也意味着需要对线程通信做大量的工作。


多个单线程/进程间的通信(包含启动)要比单一进程中的多线程间的通信(包括启动)的开销大,若不考虑共享内存可能会带来的问题,多线程将会成为主流语言(包括C++)更青睐的并发途径。


此外,C++标准并未对进程间通信提供任何原生支持,所以使用多进程的方式实现,这会依赖与平台相关的API。


因此,本书只关注使用多线程的并发,并且在此之后所提到“并发”,均假设为多线程来实现。


了解并发后,让来看看为什么要使用并发。


1.2 为什么使用并发?

主要原因有两个:关注点分离(SOC)和性能。


事实上,它们应该是使用并发的唯一原因;如果你观察得足够仔细,所有因素都可以归结到其中的一个原因(或者可能是两个都有。


当然,除了像“就因为我愿意”这样的原因之外)。


1.2.1 为了分离关注点

编写软件时,分离关注点是个好主意;通过将相关的代码与无关的代码分离,可以使程序更容易理解和测试,从而减少出错的可能性。


即使一些功能区域中的 *** 作需要在同一时刻发生的情况下,依旧可以使用并发分离不同的功能区域;若不显式地使用并发,就得编写一个任务切换框架,或者在 *** 作中主动地调用一段不相关的代码。


考虑一个有用户界面的处理密集型应用——DVD播放程序。


这样的应用程序,应具备这两种功能:一,要从光盘中读出数据,对图像和声音进行解码,之后把解码出的信号输出至视频和音频硬件,从而实现DVD的无误播放;二,还需要接受来自用户的输入,当用户单击“暂停”、“返回菜单”或“退出”按键的时候执行对应的 *** 作。


当应用是单个线程时,应用需要在回放期间定期检查用户的输入,这就需要把“DVD播放”代码和“用户界面”代码放在一起,以便调用。


如果使用多线程方式来分隔这些关注点,“用户界面”代码和“DVD播放”代码就不再需要放在一起:一个线程可以处理“用户界面”事件,另一个进行“DVD播放”。


它们之间会有交互(用户点击“暂停”),不过任务间需要人为的进行关联。


这会给响应性带来一些错觉,因为用户界面线程通常可以立即响应用户的请求,在当请求传达给忙碌线程,这时的相应可以是简单地显示代表忙碌的光标或“请等待”字样的消息。


类似地,独立的线程通常用来执行那些必须在后台持续运行的任务,例如,桌面搜索程序中监视文件系统变化的任务。


因为它们之间的交互清晰可辨,所以这种方式会使每个线程的逻辑变的更加简单。


在这种情况下,线程的数量不再依赖CPU中的可用内核的数量,因为对线程的划分是基于概念上的设计,而不是一种增加吞吐量的尝试。


1.2.2 为了性能

多处理器系统已经存在了几十年,但直到最近,它们也只在超级计算机、大型机和大型服务器系统中才能看到。


然而,芯片制造商越来越倾向于多核芯片的设计,即在单个芯片上集成2、4、16或更多的处理器,从而获取更好的性能。


因此,多核台式计算机、多核嵌入式设备,现在越来越普遍。


它们计算能力的提高不是源自使单一任务运行的更快,而是并行运行多个任务。


在过去,程序员曾坐看他们的程序随着处理器的更新换代而变得更快,无需他们这边做任何事。


但是现在,就像Herb Sutter所说的,“没有免费的午餐了。


”[1] 如果想要利用日益增长的计算能力,那就必须设计多任务并发式软件


程序员必须留意这个,尤其是那些迄今都忽略并发的人们,现在很有必要将其加入工具箱中了。


两种方式利用并发提高性能:第一,将一个单个任务分成几部分,且各自并行运行,从而降低总运行时间。


这就是任务并行(task parallelism)。


虽然这听起来很直观,但它是一个相当复杂的过程,因为在各个部分之间可能存在着依赖。


区别可能是在过程方面——一个线程执行算法的一部分,而另一个线程执行算法的另一个部分——或是在数据方面——每个线程在不同的数据部分上执行相同的 *** 作(第二种方式)。


后一种方法被称为数据并行(data parallelism)。


第一种并行方式影响的算法常被称为易并行(embarrassingly parallel)算法。


尽管易并行算法的代码会让你感觉到头痛,但这对于你来说是一件好事:我曾遇到过自然并行(naturally parallel)和便利并发(conveniently concurrent)的算法。


易并行算法具有良好的可扩展特性——当可用硬件线程的数量增加时,算法的并行性也会随之增加。


这种算法能很好的体现人多力量大


如果算法中有不易并行的部分,你可以把算法划分成固定(不可扩展)数量的并行任务。


第8章将会再来讨论,在线程之间划分任务的技巧。


第二种方法是使用可并行的方式,来解决更大的问题;与其同时处理一个文件,不如酌情处理2个、10个或20个。


虽然,这是数据并行的一种应用(通过对多组数据同时执行相同的 *** 作),但着重点不同。


处理一个数据块仍然需要同样的时间,但在相同的时间内处理了更多的数据。


当然,这种方法也有限制,并非在所有情况下都是有益的。


不过,这种方法所带来的吞吐量提升,可以让某些新功能成为可能,例如,可以并行处理图片的各部分,就能提高视频的分辨率。


1.2.3 什么时候不使用并发

知道何时不使用并发与知道何时使用它一样重要。


基本上,不使用并发的唯一原因就是,收益比不上成本。


使用并发的代码在很多情况下难以理解,因此编写和维护的多线程代码就会产生直接的脑力成本,同时额外的复杂性也可能引起更多的错误。


除非潜在的性能增益足够大或关注点分离地足够清晰,能抵消所需的额外的开发时间以及与维护多线程代码相关的额外成本(代码正确的前提下);否则,别用并发。


同样地,性能增益可能会小于预期;因为 *** 作系统需要分配内核相关资源和堆栈空间,所以在启动线程时存在固有的开销,然后才能把新线程加入调度器中,所有这一切都需要时间。


如果在线程上的任务完成得很快,那么任务实际执行的时间要比启动线程的时间小很多,这就会导致应用程序的整体性能还不如直接使用“产生线程”的方式。


此外,线程是有限的资源。


如果让太多的线程同时运行,则会消耗很多 *** 作系统资源,从而使得 *** 作系统整体上运行得更加缓慢。


不仅如此,因为每个线程都需要一个独立的堆栈空间,所以运行太多的线程也会耗尽进程的可用内存或地址空间。


对于一个可用地址空间为4GB(32bit)的平坦架构的进程来说,这的确是个问题:如果每个线程都有一个1MB的堆栈(很多系统都会这样分配),那么4096个线程将会用尽所有地址空间,不会给代码、静态数据或者堆数据留有任何空间。


即便64位(或者更大)的系统不存在这种直接的地址空间限制,但其他资源有限:如果你运行了太多的线程,最终也是出会问题的。


尽管线程池(参见第9章)可以用来限制线程的数量,但这也并不是什么灵丹妙药,它也有自己的问题。


当客户端/服务器(C/S)应用在服务器端为每一个链接启动一个独立的线程,对于少量的链接是可以正常工作的,但当同样的技术用于需要处理大量链接的高需求服务器时,也会因为线程太多而耗尽系统资源。


在这种场景下,使用线程池可以对性能产生优化(参见第9章)。


最后,运行越多的线程, *** 作系统就需要做越多的上下文切换,每一次切换都需要耗费本可以花在有价值工作上的时间。


所以在某些时候,增加一个额外的线程实际上会降低,而非提高应用程序的整体性能。


为此,如果你试图得到系统的最佳性能,可以考虑使用硬件并发(或不用),并调整运行线程的数量。


为性能而使用并发就像所有其他优化策略一样:它拥有大幅度提高应用性能的潜力,但它也可能使代码复杂化,使其更难理解,并更容易出错。


因此,只有应用中具有显著增益潜力的性能关键部分,才值得并发化。


当然,如果性能收益的潜力仅次于设计清晰或关注点分离,可能也值得使用多线程设计。


假设你已经决定确实要在应用中使用并发,无论是为了性能、关注点分离,亦或是因为多线程星期一(multithreading Monday)(译者:可能是学习多线程的意思)。


问题又来了,对于C++程序员来说,多线程意味着什么?

1.3 C++中的并发和多线程

通过多线程为C++并发提供标准化支持是件新鲜事。


只有在C++11标准下,才能编写不依赖平台扩展的多线程代码。


了解C++线程库中的众多规则前,先来了解一下其发展的历史。


1.3.1 C++多线程历史

C++98(1998)标准不承认线程的存在,并且各种语言要素的 *** 作效果都以顺序抽象机的形式编写。


不仅如此,内存模型也没有正式定义,所以在C++98标准下,没办法在缺少编译器相关扩展的情况下编写多线程应用程序。


当然,编译器供应商可以自由地向语言添加扩展,添加C语言中流行的多线程API———POSIX标准中的C标准和Microsoft Windows API中的那些———这就使得很多C++编译器供应商通过各种平台相关扩展来支持多线程。


这种编译器支持一般受限于只能使用平台相关的C语言API,并且该C++运行库(例如,异常处理机制的代码)能在多线程情况下正常工作。


因为编译器和处理器的实际表现很不错了,所以在少数编译器供应商提供正式的多线程感知内存模型之前,程序员们已经编写了大量的C++多线程程序了。


由于不满足于使用平台相关的C语言API来处理多线程,C++程序员们希望使用的类库能提供面向对象的多线程工具。


像MFC这样的应用框架,如同Boost和ACE这样的已积累了多组类的通用C++类库,这些类封装了底层的平台相关API,并提供用来简化任务的高级多线程工具。


各种类和库在细节方面差异很大,但在启动新线程的方面,总体构造却大同小异。


一个为许多C++类和库共有的设计,同时也是为程序员提供很大便利的设计,也就是使用带锁的获取资源即初始化(RAII, Resource Acquisition Is Initialization)的习惯,来确保当退出相关作用域时互斥元解锁。


编写多线程代码需要坚实的编程基础,当前的很多C++编译器为多线程编程者提供了对应(平台相关)的API;当然,还有一些与平台无关的C++类库(例如:Boost和ACE)。


正因为如此,程序员们可以通过这些API来实现多线程应用。


不过,由于缺乏统一标准的支持,缺少统一的线程内存模型,进而导致一些问题,这些问题在跨硬件或跨平台相关的多线程应用上表现得尤为明显。


1.3.2 新标准支持并发

所有的这些随着C++11标准的发布而改变了,新标准中不仅有了一个全新的线程感知内存模型,C++标准库也扩展了:包含了用于管理线程(参见第2章)、保护共享数据(参见第3章)、线程间同步 *** 作(参见第4章),以及低级原子 *** 作(参见第5章)的各种类。


新C++线程库很大程度上,是基于上文提到的C++类库的经验积累。


特别是,Boost线程库作为新类库的主要模型,很多类与Boost库中的相关类有着相同名称和结构。


随着C++标准的进步,Boost线程库也配合着C++标准在许多方面做出改变,因此之前使用Boost的用户将会发现自己非常熟悉C++11的线程库。


如本章起始提到的那样,支持并发仅仅是C++标准的变化之一,此外还有很多对于编程语言自身的改善,就是为了让程序员们的工作变得更加轻松。


这些内容在本书的论述范围之外,但是其中的一些变化对于线程库本身及其使用方式产生了很大的影响。


附录A会对这些特性做一些介绍。


新的C++标准直接支持原子 *** 作,允许程序员通过定义语义的方式编写高效的代码,从而无需了解与平台相关的汇编指令。


这对于试图编写高效、可移植代码的程序员们来说是一个好消息;编译器不仅可以搞定具体平台,还可以编写优化器来解释 *** 作语义,从而让程序整体得到更好的优化。


1.3.3 C++线程库的效率

通常情况下,这是高性能计算开发者对C++的担忧之一。


为了效率,C++类整合了一些底层工具。


这样就需要了解相关使用高级工具和使用低级工具的开销差,这个开销差就是抽象代价(abstraction penalty)。


C++标准委员会在设计标准库时,特别是设计标准线程库的时候,就已经注意到了这点;目的就是在实现相同功能的前提下,直接使用底层API并不会带来过多的性能收益。


因此,该类库在大部分主流平台上都能实现高效(带有非常低的抽象代价)。


C++标准委员会为了达到终极性能,需要确保C++能给那些要与硬件打交道的程序员,提供足够多的的底层工具。


为了这个目的,伴随着新的内存模型,出现了一个综合的原子 *** 作库,可用于直接控制单个位、字节、内部线程间同步,以及所有变化的可见性。


原子类型和相应的 *** 作现在可以在很多地方使用,而这些地方以前可能使用的是平台相关的汇编代码。


使用了新标准的代码会具有更好的可移植性,而且更容易维护。


C++标准库也提供了更高级别的抽象和工具,使得编写多线程代码更加简单,并且不易出错。


有时运用这些工具确实会带来性能开销,因为有额外的代码必须执行。


但是,这种性能成本并不一定意味着更高的抽象代价;总体来看,这种性能开销并不比手工编写等效函数高,而且编译器可能会很好地内联大部分额外代码。


某些情况下,高级工具会提供一些额外的功能。


大部分情况下这都不是问题,因为你没有为你不使用的那部分买单。


在罕见的情况下,这些未使用的功能会影响其他代码的性能。


如果你很看重程序的性能,并且高级工具带来的开销过高,你最好是通过较低级别的工具来实现你需要的功能。


绝大多数情况下,额外增加的复杂性和出错几率都远大于性能的小幅提升带来的收益。


即便是有证据确实表明瓶颈出现在C++标准库的工具中,也可能会归咎于低劣的应用设计,而非低劣的类库实现。


例如,如果过多的线程竞争一个互斥单元,将会很明显的影响性能。


与其在互斥 *** 作上耗费时间,不如重新设计应用,减少互斥元上的竞争来得划算。


如何减少应用中的竞争,会在第8章中再次提及。


在C++标准库没有提供所需的性能或行为时,就需要使用与平台相关的工具。


1.3.4 平台相关的工具

虽然C++线程库为多线程和并发处理提供了较全面的工具,但在某些平台上提供额外的工具。


为了方便地访问那些工具的同时,又使用标准C++线程库,在C++线程库中提供一个native_handle()成员函数,允许通过使用平台相关API直接 *** 作底层实现。


就其本质而言,任何使用native_handle()执行的 *** 作都是完全依赖于平台的,这超出了本书(同时也是标准C++库本身)的范围。


所以,使用平台相关的工具之前,要明白标准库能够做什么,那么下面通过一个栗子来展示下吧。


1.4 开始入门

ok!现在你有一个能与C++11标准兼容的编译器。


接下来呢?一个C++多线程程序是什么样子呢?其实,它看上去和其他C++程序差不多,通常是变量、类以及函数的组合。


唯一的区别在于某些函数可以并发运行,所以需要确保共享数据在并发访问时是安全的,详见第3章。


当然,为了并发地运行函数,必须使用特定的函数以及对象来管理各个线程。


1.4.1 你好,并发世界

从一个经典的例子开始:一个打印“Hello World.”的程序。


一个非常简单的在单线程中运行的Hello World程序如下所示,当我们谈到多线程时,它可以作为一个基准。


#include 
int main()
{
  std::cout << "Hello World\n";
}

这个程序所做的就是将“Hello World”写进标准输出流。


让我们将它与下面清单所示的简单的“Hello, Concurrent World”程序做个比较,它启动了一个独立的线程来显示这个信息。


清单 1.1 一个简单的Hello, Concurrent World程序:

#include 
#include   //①
void hello()  //②
{
  std::cout << "Hello Concurrent World\n";
}
int main()
{
  std::thread t(hello);  //③
  t.join();  //④
}

第一个区别是增加了#include ①,标准C++库中对多线程支持的声明在新的头文件中:管理线程的函数和类在中声明,而保护共享数据的函数和类在其他头文件中声明。


其次,打印信息的代码被移动到了一个独立的函数中②。


因为每个线程都必须具有一个初始函数(initial function),新线程的执行从这里开始。


对于应用程序来说,初始线程是main(),但是对于其他线程,可以在std::thread对象的构造函数中指定——本例中,被命名为t③的std::thread对象拥有新函数hello()作为其初始函数。


下一个区别:与直接写入标准输出或是从main()调用hello()不同,该程序启动了一个全新的线程来实现,将线程数量一分为二——初始线程始于main(),而新线程始于hello()。


新的线程启动之后③,初始线程继续执行。


如果它不等待新线程结束,它就将自顾自地继续运行到main()的结束,从而结束程序——有可能发生在新线程运行之前。


这就是为什么在④这里调用join()的原因——详见第2章,这会导致调用线程(在main()中)等待与std::thread对象相关联的线程,即这个例子中的t。


这看起来仅仅为了将一条信息写入标准输出而做了大量的工作,确实如此——正如上文1.2.3节所描述的,一般来说并不值得为了如此简单的任务而使用多线程,尤其是在这期间初始线程并没做什么。


本书后面的内容中,将通过实例来展示在哪些情景下使用多线程可以获得收益。


1.5 本章总结

本章中,提及了并发与多线程的含义,以及在你的应用程序中为什么你会选择使用(或不使用)它。


还提及了多线程在C++中的发展历程,从1998标准中完全缺乏支持,经历了各种平台相关的扩展,再到新的C++11标准中具有合适的多线程支持。


芯片制造商选择了以多核心的形式,使得更多任务可以同时执行的方式来增加处理能力,而不是增加单个核心的执行速度。


在这个趋势下,C++多线程来的正是时候,它使得程序员们可以利用新的CPU,带来的更加强大的硬件并发。


使用1.4节中例子,展示C++标准库中的类和函数有多么的简单。


C++中使用多线程并不复杂,复杂的是如何设计代码以实现其预期的行为。


尝试了1.4节的示例后,是时候看看更多实质性的内容了。


第2章中,我们将了解一下用于管理线程的类和函数。


第2章 线程管理

本章主要内容

  • 启动新线程
  • 等待线程与分离线程
  • 线程唯一标识符

好的!看来你已经决定使用多线程了。


先做点什么呢?启动线程、结束线程,还是如何监管线程?C++标准库中只需要管理std::thread关联的线程,无需把注意力放在其他方面。


不过,标准库太灵活,所以管理起来不会太容易。


本章将从基本开始:启动一个线程,等待这个线程结束,或放在后台运行。


再看看怎么给已经启动的线程函数传递参数,以及怎么将一个线程的所有权从当前std::thread对象移交给另一个。


最后,再来确定线程数,以及识别特殊线程。


2.1 线程管理的基础

每个程序至少有一个线程:执行main()函数的线程,其余线程有其各自的入口函数。


线程与原始线程(以main()为入口函数的线程)同时运行。


如同main()函数执行完会退出一样,当线程执行完入口函数后,线程也会退出。


在为一个线程创建了一个std::thread对象后,需要等待这个线程结束;不过,线程需要先进行启动。


下面就来启动线程。


2.1.1 启动线程

第1章中,线程在std::thread对象创建(为线程指定任务)时启动。


最简单的情况下,任务也会很简单,通常是无参数无返回的函数。


这种函数在其所属线程上运行,直到函数执行完毕,线程也就结束了。


在一些极端情况下,线程运行时,任务中的函数对象需要通过某种通讯机制进行参数的传递,或者执行一系列独立 *** 作;可以通过通讯机制传递信号,让线程停止。


线程要做什么,以及什么时候启动,其实都无关紧要。


总之,使用C++线程库启动线程,可以归结为构造std::thread对象:

void do_some_work();
std::thread my_thread(do_some_work);
为了让编译器识别std::thread类,这个简单的例子也要包含头文件。


如同大多数C++标准库一样,std::thread可以用可调用类型构造,将带有函数调用符类型的实例传入std::thread类中,替换默认的构造函数。


class background_task
{
public:
  void operator()() const
  {
    do_something();
    do_something_else();
  }
};

background_task f;
std::thread my_thread(f);

代码中,提供的函数对象会复制到新线程的存储空间当中,函数对象的执行和调用都在线程的内存空间中进行。


函数对象的副本应与原始函数对象保持一致,否则得到的结果会与我们的期望不同。


有件事需要注意,当把函数对象传入到线程构造函数中时,需要避免“最令人头痛的语法解析”(C++’s most vexing parse, 中文简介)。


如果你传递了一个临时变量,而不是一个命名的变量;C++编译器会将其解析为函数声明,而不是类型对象的定义。


例如:

std::thread my_thread(background_task());

这里相当与声明了一个名为my_thread的函数,这个函数带有一个参数(函数指针指向没有参数并返回background_task对象的函数),返回一个std::thread对象的函数,而非启动了一个线程。


使用在前面命名函数对象的方式,或使用多组括号①,或使用新统一的初始化语法②,可以避免这个问题。


如下所示:

std::thread my_thread((background_task()));  // 1
std::thread my_thread{background_task()};    // 2
使用lambda表达式也能避免这个问题。


lambda表达式是C++11的一个新特性,它允许使用一个可以捕获局部变量的局部函数(可以避免传递参数,参见2.2节)。


想要具体的了解lambda表达式,可以阅读附录A的A.5节。


之前的例子可以改写为lambda表达式的类型:

std::thread my_thread([]{
  do_something();
  do_something_else();
});

启动了线程,你需要明确是要等待线程结束(加入式——参见2.1.2节),还是让其自主运行(分离式——参见2.1.3节)。


如果std::thread对象销毁之前还没有做出决定,程序就会终止(std::thread的析构函数会调用std::terminate())。


因此,即便是有异常存在,也需要确保线程能够正确的加入(joined)或分离(detached)。


2.1.3节中,会介绍对应的方法来处理这两种情况。


需要注意的是,必须在std::thread对象销毁之前做出决定,否则你的程序将会终止(std::thread的析构函数会调用std::terminate(),这时再去决定会触发相应异常)。


如果不等待线程,就必须保证线程结束之前,可访问的数据得有效性。


这不是一个新问题——单线程代码中,对象销毁之后再去访问,也会产生未定义行为——不过,线程的生命周期增加了这个问题发生的几率。


这种情况很可能发生在线程还没结束,函数已经退出的时候,这时线程函数还持有函数局部变量的指针或引用。


下面的清单中就展示了这样的一种情况。


清单2.1 函数已经结束,线程依旧访问局部变量

struct func
{
  int& i;
  func(int& i_) : i(i_) {}
  void operator() ()
  {
    for (unsigned j=0 ; j<1000000 ; ++j)
    {
      do_something(i);           // 1. 潜在访问隐患:悬空引用
    }
  }
};

void oops()
{
  int some_local_state=0;
  func my_func(some_local_state);
  std::thread my_thread(my_func);
  my_thread.detach();          // 2. 不等待线程结束
}                              // 3. 新线程可能还在运行

这个例子中,已经决定不等待线程结束(使用了detach()②),所以当oops()函数执行完成时③,新线程中的函数可能还在运行。


如果线程还在运行,它就会去调用do_something(i)函数①,这时就会访问已经销毁的变量。


如同一个单线程程序——允许在函数完成后继续持有局部变量的指针或引用;当然,这从来就不是一个好主意——这种情况发生时,错误并不明显,会使多线程更容易出错。


处理这种情况的常规方法:使线程函数的功能齐全,将数据复制到线程中,而非复制到共享数据中。


如果使用一个可调用的对象作为线程函数,这个对象就会复制到线程中,而后原始对象就会立即销毁。


但对于对象中包含的指针和引用还需谨慎,例如清单2.1所示。


使用一个能访问局部变量的函数去创建线程是一个糟糕的主意(除非十分确定线程会在函数完成前结束)。


此外,可以通过join()函数来确保线程在函数完成前结束。


2.1.2 等待线程完成

如果需要等待线程,相关的std::thread实例需要使用join()。


清单2.1中,将my_thread.detach()替换为my_thread.join(),就可以确保局部变量在线程完成后,才被销毁。


在这种情况下,因为原始线程在其生命周期中并没有做什么事,使得用一个独立的线程去执行函数变得收益甚微,但在实际编程中,原始线程要么有自己的工作要做;要么会启动多个子线程来做一些有用的工作,并等待这些线程结束。


join()是简单粗暴的等待线程完成或不等待。


当你需要对等待中的线程有更灵活的控制时,比如,看一下某个线程是否结束,或者只等待一段时间(超过时间就判定为超时)。


想要做到这些,你需要使用其他机制来完成,比如条件变量和期待(futures),相关的讨论将会在第4章继续。


调用join()的行为,还清理了线程相关的存储部分,这样std::thread对象将不再与已经完成的线程有任何关联。


这意味着,只能对一个线程使用一次join();一旦已经使用过join(),std::thread对象就不能再次加入了,当对其使用joinable()时,将返回false。


2.1.3 特殊情况下的等待

如前所述,需要对一个还未销毁的std::thread对象使用join()或detach()。


如果想要分离一个线程,可以在线程启动后,直接使用detach()进行分离。


如果打算等待对应线程,则需要细心挑选调用join()的位置。


当在线程运行之后产生异常,在join()调用之前抛出,就意味着这次调用会被跳过。


避免应用被抛出的异常所终止,就需要作出一个决定。


通常,当倾向于在无异常的情况下使用join()时,需要在异常处理过程中调用join(),从而避免生命周期的问题。


下面的程序清单是一个例子。


清单 2.2 等待线程完成

struct func; // 定义在清单2.1中
void f()
{
  int some_local_state=0;
  func my_func(some_local_state);
  std::thread t(my_func);
  try
  {
    do_something_in_current_thread();
  }
  catch(...)
  {
    t.join();  // 1
    throw;
  }
  t.join();  // 2
}

清单2.2中的代码使用了try/catch块确保访问本地状态的线程退出后,函数才结束。


当函数正常退出时,会执行到②处;当函数执行过程中抛出异常,程序会执行到①处。


try/catch块能轻易的捕获轻量级错误,所以这种情况,并非放之四海而皆准。


如需确保线程在函数之前结束——查看是否因为线程函数使用了局部变量的引用,以及其他原因——而后再确定一下程序可能会退出的途径,无论正常与否,可以提供一个简洁的机制,来做解决这个问题。


一种方式是使用“资源获取即初始化方式”(RAII,Resource Acquisition Is Initialization),并且提供一个类,在析构函数中使用join(),如同下面清单中的代码。


看它如何简化f()函数。


清单 2.3 使用RAII等待线程完成

class thread_guard
{
  std::thread& t;
public:
  explicit thread_guard(std::thread& t_):
    t(t_)
  {}
  ~thread_guard()
  {
    if(t.joinable()) // 1
    {
      t.join();      // 2
    }
  }
  thread_guard(thread_guard const&)=delete;   // 3
  thread_guard& operator=(thread_guard const&)=delete;
};

struct func; // 定义在清单2.1中

void f()
{
  int some_local_state=0;
  func my_func(some_local_state);
  std::thread t(my_func);
  thread_guard g(t);
  do_something_in_current_thread();
}    // 4

当线程执行到④处时,局部对象就要被逆序销毁了。


因此,thread_guard对象g是第一个被销毁的,这时线程在析构函数中被加入②到原始线程中。


即使do_something_in_current_thread抛出一个异常,这个销毁依旧会发生。


在thread_guard的析构函数的测试中,首先判断线程是否已加入①,如果没有会调用join()②进行加入。


这很重要,因为join()只能对给定的对象调用一次,所以对给已加入的线程再次进行加入 *** 作时,将会导致错误。


拷贝构造函数和拷贝赋值 *** 作被标记为=delete③,是为了不让编译器自动生成它们。


直接对一个对象进行拷贝或赋值是危险的,因为这可能会弄丢已经加入的线程。


通过删除声明,任何尝试给thread_guard对象赋值的 *** 作都会引发一个编译错误。


想要了解删除函数的更多知识,请参阅附录A的A.2节。


如果不想等待线程结束,可以分离(detaching)线程,从而避免异常安全*(exception-safety)问题。


不过,这就打破了线程与std::thread对象的联系,即使线程仍然在后台运行着,分离 *** 作也能确保std::terminate()std::thread对象销毁才被调用。


2.1.4 后台运行线程

使用detach()会让线程在后台运行,这就意味着主线程不能与之产生直接交互。


也就是说,不会等待这个线程结束;如果线程分离,那么就不可能有std::thread对象能引用它,分离线程的确在后台运行,所以分离线程不能被加入。


不过C++运行库保证,当线程退出时,相关资源的能够正确回收,后台线程的归属和控制C++运行库都会处理。


通常称分离线程为守护线程(daemon threads),UNIX中守护线程是指,没有任何显式的用户接口,并在后台运行的线程。


这种线程的特点就是长时间运行;线程的生命周期可能会从某一个应用起始到结束,可能会在后台监视文件系统,还有可能对缓存进行清理,亦或对数据结构进行优化。


另一方面,分离线程的另一方面只能确定线程什么时候结束,发后即忘(fire and forget)的任务就使用到线程的这种方式。


如2.1.2节所示,调用std::thread成员函数detach()来分离一个线程。


之后,相应的std::thread对象就与实际执行的线程无关了,并且这个线程也无法加入:

std::thread t(do_background_work);
t.detach();
assert(!t.joinable());

为了从std::thread对象中分离线程(前提是有可进行分离的线程),不能对没有执行线程的std::thread对象使用detach(),也是join()的使用条件,并且要用同样的方式进行检查——当std::thread对象使用t.joinable()返回的是true,就可以使用t.detach()。


试想如何能让一个文字处理应用同时编辑多个文档。


无论是用户界面,还是在内部应用内部进行,都有很多的解决方法。


虽然,这些窗口看起来是完全独立的,每个窗口都有自己独立的菜单选项,但他们却运行在同一个应用实例中。


一种内部处理方式是,让每个文档处理窗口拥有自己的线程;每个线程运行同样的的代码,并隔离不同窗口处理的数据。


如此这般,打开一个文档就要启动一个新线程。


因为是对独立的文档进行 *** 作,所以没有必要等待其他线程完成。


因此,这里就可以让文档处理窗口运行在分离的线程上。


下面代码简要的展示了这种方法:

清单2.4 使用分离线程去处理其他文档

void edit_document(std::string const& filename)
{
  open_document_and_display_gui(filename);
  while(!done_editing())
  {
    user_command cmd=get_user_input();
    if(cmd.type==open_new_document)
    {
      std::string const new_name=get_filename_from_user();
      std::thread t(edit_document,new_name);  // 1
      t.detach();  // 2
    }
    else
    {
       process_user_input(cmd);
    }
  }
}

如果用户选择打开一个新文档,需要启动一个新线程去打开新文档①,并分离线程②。


与当前线程做出的 *** 作一样,新线程只不过是打开另一个文件而已。


所以,edit_document函数可以复用,通过传参的形式打开新的文件。


这个例子也展示了传参启动线程的方法:不仅可以向std::thread构造函数①传递函数名,还可以传递函数所需的参数(实参)。


C++线程库的方式也不是很复杂。


当然,也有其他方法完成这项功能,比如:使用一个带有数据成员的成员函数,代替一个需要传参的普通函数。


2.2 向线程函数传递参数

清单2.4中,向std::thread构造函数中的可调用对象,或函数传递一个参数很简单。


需要注意的是,默认参数要拷贝到线程独立内存中,即使参数是引用的形式,也可以在新线程中进行访问。


再来看一个例子:

void f(int i, std::string const& s);
std::thread t(f, 3, "hello");

代码创建了一个调用f(3, "hello")的线程。


注意,函数f需要一个std::string对象作为第二个参数,但这里使用的是字符串的字面值,也就是char const *类型。


之后,在线程的上下文中完成字面值向std::string对象的转化。


需要特别要注意,当指向动态变量的指针作为参数传递给线程的情况,代码如下:

void f(int i,std::string const& s);
void oops(int some_param)
{
  char buffer[1024]; // 1
  sprintf(buffer, "%i",some_param);
  std::thread t(f,3,buffer); // 2
  t.detach();
}

这种情况下,buffer是一个指针变量,指向本地变量,然后本地变量通过buffer传递到新线程中


并且,函数有很有可能会在字面值转化成std::string对象之前崩溃(oops),从而导致一些未定义的行为。


并且想要依赖隐式转换将字面值转换为函数期待的std::string对象,但因std::thread的构造函数会复制提供的变量,就只复制了没有转换成期望类型的字符串字面值。


解决方案就是在传递到std::thread构造函数之前就将字面值转化为std::string对象:

void f(int i,std::string const& s);
void not_oops(int some_param)
{
  char buffer[1024];
  sprintf(buffer,"%i",some_param);
  std::thread t(f,3,std::string(buffer));  // 使用std::string,避免悬垂指针
  t.detach();
}

还可能遇到相反的情况:期望传递一个引用,但整个对象被复制了。


当线程更新一个引用传递的数据结构时,这种情况就可能发生,比如:

void update_data_for_widget(widget_id w,widget_data& data); // 1
void oops_again(widget_id w)
{
  widget_data data;
  std::thread t(update_data_for_widget,w,data); // 2
  display_status();
  t.join();
  process_widget_data(data); // 3
}

虽然update_data_for_widget的第二个参数期待传入一个引用,但是std::thread的构造函数并不知晓;构造函数无视函数期待的参数类型,并盲目的拷贝已提供的变量。


当线程调用update_data_for_widget函数时,传递给函数的参数是data变量内部拷贝的引用,而非数据本身的引用。


因此,当线程结束时,内部拷贝数据将会在数据更新阶段被销毁,且process_widget_data将会接收到没有修改的data变量


可以使用std::ref将参数转换成引用的形式,从而可将线程的调用改为以下形式:

std::thread t(update_data_for_widget,w,std::ref(data));

在这之后,update_data_for_widget就会接收到一个data变量的引用,而非一个data变量拷贝的引用。


如果你熟悉std::bind,就应该不会对以上述传参的形式感到奇怪,因为std::thread构造函数和std::bind的 *** 作都在标准库中定义好了,可以传递一个成员函数指针作为线程函数,并提供一个合适的对象指针作为第一个参数:

class X
{
public:
  void do_lengthy_work();
};
X my_x;
std::thread t(&X::do_lengthy_work,&my_x); // 1

这段代码中,新线程将my_x.do_lengthy_work()作为线程函数;my_x的地址作为指针对象提供给函数。


也可以为成员函数提供参数:std::thread构造函数的第三个参数就是成员函数的第一个参数,以此类推(代码如下,译者自加)


class X
{
public:
  void do_lengthy_work(int);
};
X my_x;
int num(0);
std::thread t(&X::do_lengthy_work, &my_x, num);

有趣的是,提供的参数可以移动,但不能拷贝


"移动"是指:原始对象中的数据转移给另一对象,而转移的这些数据就不再在原始对象中保存了(译者:比较像在文本编辑的"剪切" *** 作)


std::unique_ptr就是这样一种类型(译者:C++11中的智能指针),这种类型为动态分配的对象提供内存自动管理机制(译者:类似垃圾回收)


同一时间内,只允许一个std::unique_ptr实现指向一个给定对象,并且当这个实现销毁时,指向的对象也将被删除。


移动构造函数(move constructor)移动赋值 *** 作符(move assignment operator)允许一个对象在多个std::unique_ptr实现中传递(有关"移动"的更多内容,请参考附录AA.1.1)


使用"移动"转移原对象后,就会留下一个空指针(NULL)


移动 *** 作可以将对象转换成可接受的类型,例如:函数参数或函数返回的类型。


当原对象是一个临时变量时,自动进行移动 *** 作,但当原对象是一个命名变量,那么转移的时候就需要使用std::move()进行显示移动。


下面的代码展示了std::move的用法,展示了std::move是如何转移一个动态对象到一个线程中去的:

void process_big_object(std::unique_ptr);

std::unique_ptr p(new big_object);
p->prepare_data(42);
std::thread t(process_big_object,std::move(p));

std::thread的构造函数中指定std::move(p),big_object对象的所有权就被首先转移到新创建线程的的内部存储中,之后传递给process_big_object函数。


标准线程库中和std::unique_ptr在所属权上有相似语义类型的类有好几种,std::thread为其中之一。


虽然,std::thread实例不像std::unique_ptr那样能占有一个动态对象的所有权,但是它能占有其他资源:每个实例都负责管理一个执行线程。


执行线程的所有权可以在多个std::thread实例中互相转移,这是依赖于std::thread实例的可移动不可复制性。


不可复制保性证了在同一时间点,一个std::thread实例只能关联一个执行线程;可移动性使得程序员可以自己决定,哪个实例拥有实际执行线程的所有权。


2.3 转移线程所有权

假设要写一个在后台启动线程的函数,想通过新线程返回的所有权去调用这个函数,而不是等待线程结束再去调用;或完全与之相反的想法:创建一个线程,并在函数中转移所有权,都必须要等待线程结束。


总之,新线程的所有权都需要转移。


这就是移动引入std::thread的原因,C++标准库中有很多资源占有(resource-owning)类型,比如std::ifstream,std::unique_ptr还有std::thread都是可移动,但不可拷贝。


这就说明执行线程的所有权可以在std::thread实例中移动,下面将展示一个例子。


例子中,创建了两个执行线程,并且在std::thread实例之间(t1,t2t3)转移所有权:

void some_function();
void some_other_function();
std::thread t1(some_function);            // 1
std::thread t2=std::move(t1);            // 2
t1=std::thread(some_other_function);    // 3
std::thread t3;                            // 4
t3=std::move(t2);                        // 5
t1=std::move(t3);                        // 6 赋值 *** 作将使程序崩溃

当显式使用std::move()创建t2t1的所有权就转移给了t2


之后,t1和执行线程已经没有关联了;执行some_function的函数现在与t2关联。


然后,与一个临时std::thread对象相关的线程启动了


为什么不显式调用std::move()转移所有权呢?因为,所有者是一个临时对象——移动 *** 作将会隐式的调用。


t3使用默认构造方式创建,与任何执行线程都没有关联。


调用std::move()将与t2关联线程的所有权转移到t3


因为t2是一个命名对象,需要显式的调用std::move()


移动 *** 作完成后,t1与执行some_other_function的线程相关联,t2与任何线程都无关联,t3与执行some_function的线程相关联。


最后一个移动 *** 作,将some_function线程的所有权转移t1


不过,t1已经有了一个关联的线程(执行some_other_function的线程),所以这里系统直接调用std::terminate()终止程序继续运行。


这样做(不抛出异常,std::terminate()noexcept函数)是为了保证与std::thread的析构函数的行为一致。


2.1.1节中,需要在线程对象被析构前,显式的等待线程完成,或者分离它;进行赋值时也需要满足这些条件(说明:不能通过赋一个新值给std::thread对象的方式来"丢弃"一个线程)


std::thread支持移动,就意味着线程的所有权可以在函数外进行转移,就如下面程序一样。


清单2.5 函数返回std::thread对象

std::thread f()
{
  void some_function();
  return std::thread(some_function);
}

std::thread g()
{
  void some_other_function(int);
  std::thread t(some_other_function,42);
  return t;
}

当所有权可以在函数内部传递,就允许std::thread实例可作为参数进行传递,代码如下:

void f(std::thread t);
void g()
{
  void some_function();
  f(std::thread(some_function));
  std::thread t(some_function);
  f(std::move(t));
}

std::thread支持移动的好处是可以创建thread_guard类的实例(定义见 清单2.3),并且拥有其线程的所有权。


thread_guard对象所持有的线程已经被引用,移动 *** 作就可以避免很多不必要的麻烦;这意味着,当某个对象转移了线程的所有权后,它就不能对线程进行加入或分离。


为了确保线程程序退出前完成,下面的代码里定义了scoped_thread类。


现在,我们来看一下这段代码:

清单2.6 scoped_thread的用法

class scoped_thread
{
  std::thread t;
public:
  explicit scoped_thread(std::thread t_):                 // 1
    t(std::move(t_))
  {
    if(!t.joinable())                                     // 2
      throw std::logic_error(“No thread”);
  }
  ~scoped_thread()
  {
    t.join();                                            // 3
  }
  scoped_thread(scoped_thread const&)=delete;
  scoped_thread& operator=(scoped_thread const&)=delete;
};

struct func; // 定义在清单2.1中

void f()
{
  int some_local_state;
  scoped_thread t(std::thread(func(some_local_state)));    // 4
  do_something_in_current_thread();
}    

与清单2.3相似,不过这里新线程是直接传递到scoped_thread,而非创建一个独立的命名变量。


当主线程到达f()函数的末尾时,scoped_thread对象将会销毁,然后加入到的构造函数创建的线程对象中去。


而在清单2.3中的thread_guard类,就要在析构的时候检查线程是否"可加入"


这里把检查放在了构造函数中,并且当线程不可加入时,抛出异常。


std::thread对象的容器,如果这个容器是移动敏感的(比如,标准中的std::vector<>),那么移动 *** 作同样适用于这些容器。


了解这些后,就可以写出类似清单2.7中的代码,代码量产了一些线程,并且等待它们结束。


清单2.7 量产线程,等待它们结束

void do_work(unsigned id);
void f()
{
  std::vector threads;
  for(unsigned i=0; i < 20; ++i)
  {
    threads.push_back(std::thread(do_work,i)); // 产生线程
  } 
  std::for_each(threads.begin(),threads.end(),
                  std::mem_fn(&std::thread::join)); // 对每个线程调用join()
}

我们经常需要线程去分割一个算法的总工作量,所以在算法结束的之前,所有的线程必须结束。


清单2.7说明线程所做的工作都是独立的,并且结果仅会受到共享数据的影响。


如果f()有返回值,这个返回值就依赖于线程得到的结果。


在写入返回值之前,程序会检查使用共享数据的线程是否终止。


*** 作结果在不同线程中转移的替代方案,我们会在第4章中再次讨论。


std::thread放入std::vector是向线程自动化管理迈出的第一步:并非为这些线程创建独立的变量,并且将他们直接加入,可以把它们当做一个组。


创建一组线程(数量在运行时确定),可使得这一步迈的更大,而非像清单2.7那样创建固定数量的线程。


2.4 运行时决定线程数量

std::thread::hardware_concurrency()在新版C++标准库中是一个很有用的函数。


这个函数将返回能同时并发在一个程序中的线程数量。


例如,多核系统中,返回值可以是CPU核芯的数量。


返回值也仅仅是一个提示,当系统信息无法获取时,函数也会返回0


但是,这也无法掩盖这个函数对启动线程数量的帮助。


清单2.8实现了一个并行版的std::accumulate


代码中将整体工作拆分成小任务交给每个线程去做,其中设置最小任务数,是为了避免产生太多的线程。


程序可能会在 *** 作数量为0的时候抛出异常。


比如,std::thread构造函数无法启动一个执行线程,就会抛出一个异常。


在这个算法中讨论异常处理,已经超出现阶段的讨论范围,这个问题我们将在第8章中再来讨论。


清单2.8 原生并行版的std::accumulate

template
struct accumulate_block
{
  void operator()(Iterator first,Iterator last,T& result)
  {
    result=std::accumulate(first,last,result);
  }
};

template
T parallel_accumulate(Iterator first,Iterator last,T init)
{
  unsigned long const length=std::distance(first,last);

  if(!length) // 1
    return init;

  unsigned long const min_per_thread=25;
  unsigned long const max_threads=
      (length+min_per_thread-1)/min_per_thread; // 2

  unsigned long const hardware_threads=
      std::thread::hardware_concurrency();

  unsigned long const num_threads=  // 3
      std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);

  unsigned long const block_size=length/num_threads; // 4

  std::vector results(num_threads);
  std::vector threads(num_threads-1);  // 5

  Iterator block_start=first;
  for(unsigned long i=0; i < (num_threads-1); ++i)
  {
    Iterator block_end=block_start;
    std::advance(block_end,block_size);  // 6
    threads[i]=std::thread(     // 7
        accumulate_block(),
        block_start,block_end,std::ref(results[i]));
    block_start=block_end;  // 8
  }
  accumulate_block()(
      block_start,last,results[num_threads-1]); // 9
  std::for_each(threads.begin(),threads.end(),
       std::mem_fn(&std::thread::join));  // 10

  return std::accumulate(results.begin(),results.end(),init); // 11
}

函数看起来很长,但不复杂。


如果输入的范围为空,就会得到init的值。


反之,如果范围内多于一个元素时,都需要用范围内元素的总数量除以线程()中最小任务数,从而确定启动线程的最大数量,这样能避免无谓的计算资源的浪费。


比如,一台32芯的机器上,只有5个数需要计算,却启动了32个线程。


计算量的最大值和硬件支持线程数中,较小的值为启动线程的数量


因为上下文频繁的切换会降低线程的性能,所以你肯定不想启动的线程数多于硬件支持的线程数量。


std::thread::hardware_concurrency()返回0,你可以选择一个合适的数作为你的选择;在本例中,我选择了"2"


你也不想在一台单核机器上启动太多的线程,因为这样反而会降低性能,有可能最终让你放弃使用并发。


每个线程中处理的元素数量,是范围中元素的总量除以线程的个数得出的


对于分配是否得当,我们会在后面讨论。


现在,确定了线程个数,通过创建一个std::vector容器存放中间结果,并为线程创建一个std::vector容器


这里需要注意的是,启动的线程数必须比num_threads1个,因为在启动之前已经有了一个线程(主线程)


使用简单的循环来启动线程:block_end迭代器指向当前块的末尾,并启动一个新线程为当前块累加结果


当迭代器指向当前块的末尾时,启动下一个块


启动所有线程后,中的线程会处理最终块的结果。


对于分配不均,因为知道最终块是哪一个,那么这个块中有多少个元素就无所谓了。


当累加最终块的结果后,可以等待std::for_each创建线程的完成(如同在清单2.7中做的那样),之后使用std::accumulate将所有结果进行累加


结束这个例子之前,需要明确:T类型的加法运算不满足结合律(比如,对于float型或double型,在进行加法 *** 作时,系统很可能会做截断 *** 作),因为对范围中元素的分组,会导致parallel_accumulate得到的结果可能与std::accumulate得到的结果不同。


同样的,这里对迭代器的要求更加严格:必须都是向前迭代器,而std::accumulate可以在只传入迭代器的情况下工作。


对于创建出results容器,需要保证T有默认构造函数。


对于算法并行,通常都要这样的修改;不过,需要根据算法本身的特性,选择不同的并行方式。


算法并行会在第8章有更加深入的讨论。


需要注意的:因为不能直接从一个线程中返回一个值,所以需要传递results容器的引用到线程中去。


另一个办法,通过地址来获取线程执行的结果;第4章中,我们将使用期望(futures)完成这种方案。


当线程运行时,所有必要的信息都需要传入到线程中去,包括存储计算结果的位置。


不过,并非总需如此:有时候这是识别线程的可行方案,可以传递一个标识数,例如清单2.7中的i


不过,当需要标识的函数在调用栈的深层,同时其他线程也可调用该函数,那么标识数就会变的捉襟见肘。


好消息是在设计C++的线程库时,就有预见了这种情况,在之后的实现中就给每个线程附加了唯一标识符。


2.5 识别线程

线程标识类型是std::thread::id,可以通过两种方式进行检索。


第一种,可以通过调用std::thread对象的成员函数get_id()来直接获取。


如果std::thread对象没有与任何执行线程相关联,get_id()将返回std::thread::type默认构造值,这个值表示没有线程


第二种,当前线程中调用std::this_thread::get_id()(这个函数定义在头文件中)也可以获得线程标识。


std::thread::id对象可以自由的拷贝和对比,因为标识符就可以复用。


如果两个对象的std::thread::id相等,那它们就是同一个线程,或者都没有线程


如果不等,那么就代表了两个不同线程,或者一个有线程,另一没有。


线程库不会限制你去检查线程标识是否一样,std::thread::id类型对象提供相当丰富的对比 *** 作;比如,提供为不同的值进行排序。


这意味着允许程序员将其当做为容器的键值,做排序,或做其他方式的比较。


按默认顺序比较不同值的std::thread::id,所以这个行为可预见的:当ab时,得a,等等。


标准库也提供std::hash容器,所以std::thread::id也可以作为无序容器的键值。


std::thread::id实例常用作检测线程是否需要进行一些 *** 作,比如:当用线程来分割一项工作(如清单2.8),主线程可能要做一些与其他线程不同的工作。


这种情况下,启动其他线程前,它可以将自己的线程ID通过std::this_thread::get_id()得到,并进行存储。


就是算法核心部分(所有线程都一样的),每个线程都要检查一下,其拥有的线程ID是否与初始线程的ID相同。


std::thread::id master_thread;
void some_core_part_of_algorithm()
{
  if(std::this_thread::get_id()==master_thread)
  {
    do_master_thread_work();
  }
  do_common_work();
}

另外,当前线程的std::thread::id将存储到一个数据结构中。


之后在这个结构体中对当前线程的ID与存储的线程ID做对比,来决定 *** 作是被允许,还是需要”(permitted/required)


同样,作为线程和本地存储不适配的替代方案,线程ID在容器中可作为键值。


例如,容器可以存储其掌控下每个线程的信息,或在多个线程中互传信息。


std::thread::id可以作为一个线程的通用标识符,当标识符只与语义相关(比如,数组的索引)时,就需要这个方案了。


也可以使用输出流(std::cout)来记录一个std::thread::id对象的值。


std::cout<

具体的输出结果是严格依赖于具体实现的,C++标准的唯一要求就是要保证ID比较结果相等的线程,必须有相同的输出。


2.6 本章总结

本章讨论了C++标准库中基本的线程管理方式:启动线程,等待结束和不等待结束(因为需要它们运行在后台)


并了解应该如何在线程启动前,向线程函数中传递参数,如何转移线程的所有权,如何使用线程组来分割任务。


最后,讨论了使用线程标识来确定关联数据,以及特殊线程的特殊解决方案。


虽然,现在已经可以纯粹的依赖线程,使用独立的数据,做独立的任务(如同清单2.8),但在某些情况下,线程确实需要有共享数据。


3章会讨论共享数据和线程的直接关系。


4章会讨论在(/没有)共享数据情况下的线程同步 *** 作。


本章主要内容

  • 共享数据带来的问题
  • 使用互斥量保护数据
  • 数据保护的替代方案

上一章中,我们已经对线程管理有所了解了,现在让我们来看一下共享数据的那些事


想象一下,你和你的朋友合租一个公寓,公寓中只有一个厨房和一个卫生间。


当你的朋友在卫生间时,你就会不能使用了(除非你们特别好,好到可以在同时使用一个房间)


这个问题也会出现在厨房,假如:厨房里有一个组合式烤箱,当在烤香肠的时候,也在做蛋糕,就可能得到我们不想要的食物(香肠味的蛋糕)


此外,在公共空间将一件事做到一半时,发现某些需要的东西被别人借走,或是当离开的一段时间内有些东西被变动了地方,这都会令我们不爽。


同样的问题,也困扰着线程。


当线程在访问共享数据的时候,必须定一些规矩,用来限定线程可访问的数据位。


还有,一个线程更新了共享数据,需要对其他线程进行通知。


从易用性的角度,同一进程中的多个线程进行数据共享,有利有弊。


错误的共享数据使用是产生并发bug的一个主要原因,并且后果要比香肠味的蛋糕更加严重。


本章就以在C++中进行安全的数据共享为主题。


避免上述及其他潜在问题的发生的同时,将共享数据的优势发挥到最大。


第3章 线程间共享数据 3.1 共享数据带来的问题

当涉及到共享数据时,问题很可能是因为共享数据修改所导致。


如果共享数据是只读的,那么只读 *** 作不会影响到数据,更不会涉及对数据的修改,所以所有线程都会获得同样的数据。


但是,当一个或多个线程要修改共享数据时,就会产生很多麻烦。


这种情况下,就必须小心谨慎,才能确保一切所有线程都工作正常。


不变量(invariants)的概念对程序员们编写的程序会有一定的帮助——对于特殊结构体的描述;比如,变量包含列表中的项数


不变量通常会在一次更新中被破坏,特别是比较复杂的数据结构,或者一次更新就要改动很大的数据结构。


双链表中每个节点都有一个指针指向列表中下一个节点,还有一个指针指向前一个节点。


其中不变量就是节点A中指向下一个节点B的指针,还有前向指针。


为了从列表中删除一个节点,其两边节点的指针都需要更新。


当其中一边更新完成时,不变量就被破坏了,直到另一边也完成更新;在两边都完成更新后,不变量就又稳定了。


从一个列表中删除一个节点的步骤如下(如图3.1)

  1. 找到要删除的节点N
  2. 更新前一个节点指向N的指针,让这个指针指向N的下一个节点
  3. 更新后一个节点指向N的指针,让这个指正指向N的前一个节点
  4. 删除节点N

3.1 从一个双链表中删除一个节点

图中bc在相同的方向上指向和原来已经不一致了,这就破坏了不变量。


线程间潜在问题就是修改共享数据,致使不变量遭到破坏。


当不做些事来确保在这个过程中不会有其他线程进行访问的话,可能就有线程访问到刚刚删除一边的节点;这样的话,线程就读取到要删除节点的数据(因为只有一边的连接被修改,如图3.1(b)),所以不变量就被破坏。


破坏不变量的后果是多样,当其他线程按从左往右的顺序来访问列表时,它将跳过被删除的节点。


在一方面,如有第二个线程尝试删除图中右边的节点,那么可能会让数据结构产生永久性的损坏,使程序崩溃。


无论结果如何,都是并行代码常见错误:条件竞争。


 

3.1.1 条件竞争

假设你去电影院买电影票。


如果去的是一家大电影院,有很多收银台,很多人就可以在同一时间买电影票。


当另一个收银台也在卖你想看的这场电影的电影票,那么你的座位选择范围就取决于在之前已预定的座位。


当只有少量的座位剩下,这就意味着,这可能是一场抢票比赛,看谁能抢到最后一张票。


这就是一个条件竞争的例子:你的座位(或者你的电影票)都取决于两种购买方式的相对顺序。


并发中竞争条件的形成,取决于一个以上线程的相对执行顺序,每个线程都抢着完成自己的任务。


大多数情况下,即使改变执行顺序,也是良性竞争,其结果可以接受。


例如,有两个线程同时向一个处理队列中添加任务,因为系统提供的不变量保持不变,所以谁先谁后都不会有什么影响。


当不变量遭到破坏时,才会产生条件竞争,比如双向链表的例子。


并发中对数据的条件竞争通常表示为恶性条件竞争,我们对不产生问题的良性条件竞争不感兴趣。


C++标准中也定义了数据竞争这个术语,一种特殊的条件竞争:并发的去修改一个独立对象(参见5.1.2),数据竞争是(可怕的)未定义行为的起因。


恶性条件竞争通常发生于完成对多于一个的数据块的修改时,例如,对两个连接指针的修改(如图3.1)


因为 *** 作要访问两个独立的数据块,独立的指令将会对数据块将进行修改,并且其中一个线程可能正在进行时,另一个线程就对数据块进行了访问。


因为出现的概率太低,条件竞争很难查找,也很难复现。


CPU指令连续修改完成后,即使数据结构可以让其他并发线程访问,问题再次复现的几率也相当低。


当系统负载增加时,随着执行数量的增加,执行序列的问题复现的概率也在增加,这样的问题只可能会出现在负载比较大的情况下。


条件竞争通常是时间敏感的,所以程序以调试模式运行时,它们常会完全消失,因为调试模式会影响程序的执行时间(即使影响不多)


当你以写多线程程序为生,条件竞争就会成为你的梦魇;编写软件时,我们会使用大量复杂的 *** 作,用来避免恶性条件竞争。


3.1.2 避免恶性条件竞争

这里提供一些方法来解决恶性条件竞争,最简单的办法就是对数据结构采用某种保护机制,确保只有进行修改的线程才能看到不变量被破坏时的中间状态。


从其他访问线程的角度来看,修改不是已经完成了,就是还没开始。


C++标准库提供很多类似的机制,下面会逐一介绍。


另一个选择是对数据结构和不变量的设计进行修改,修改完的结构必须能完成一系列不可分割的变化,也就是保证每个不变量保持稳定的状态,这就是所谓的无锁编程。


不过,这种方式很难得到正确的结果。


如果到这个级别,无论是内存模型上的细微差异,还是线程访问数据的能力,都会让工作变的复杂。


内存模型将在第5章讨论,无锁编程将在第7章讨论。


另一种处理条件竞争的方式是,使用事务的方式去处理数据结构的更新(这里的"处理"就如同对数据库进行更新一样)


所需的一些数据和读取都存储在事务日志中,然后将之前的 *** 作合为一步,再进行提交。


当数据结构被另一个线程修改后,或处理已经重启的情况下,提交就会无法进行,这称作为软件事务内存


理论研究中,这是一个很热门的研究领域。


这个概念将不会在本书中再进行介绍,因为在C++中没有对STM进行直接支持。


但是,基本思想会在后面提及。


保护共享数据结构的最基本的方式,是使用C++标准库提供的互斥量。


3.2 使用互斥量保护共享数据

当程序中有共享数据,肯定不想让其陷入条件竞争,或是不变量被破坏。


那么,将所有访问共享数据结构的代码都标记为互斥岂不是更好?这样任何一个线程在执行这些代码时,其他任何线程试图访问共享数据结构,就必须等到那一段代码执行结束。


于是,一个线程就不可能会看到被破坏的不变量,除非它本身就是修改共享数据的线程。


当访问共享数据前,使用互斥量将相关数据锁住,再当访问结束后,再将数据解锁。


线程库需要保证,当一个线程使用特定互斥量锁住共享数据时,其他的线程想要访问锁住的数据,都必须等到之前那个线程对数据进行解锁后,才能进行访问。


这就保证了所有线程能看到共享数据,而不破坏不变量。


互斥量是C++中一种最通用的数据保护机制,但它不是银d;精心组织代码来保护正确的数据(3.2.2),并在接口内部避免竞争条件(3.2.3)是非常重要的。


但互斥量自身也有问题,也会造成死锁(3.2.4),或是对数据保护的太多(或太少)(3.2.8)


3.2.1 C++中使用互斥量

C++中通过实例化std::mutex创建互斥量,通过调用成员函数lock()进行上锁,unlock()进行解锁。


不过,不推荐实践中直接去调用成员函数,因为调用成员函数就意味着,必须记住在每个函数出口都要去调用unlock(),也包括异常的情况。


C++标准库为互斥量提供了一个RAII语法的模板类std::lock_guard,其会在构造的时候提供已锁的互斥量,并在析构的时候进行解锁,从而保证了一个已锁的互斥量总是会被正确的解锁。


下面的程序清单中,展示了如何在多线程程序中,使用std::mutex构造的std::lock_guard实例,对一个列表进行访问保护。


std::mutexstd::lock_guard都在头文件中声明。


清单3.1 使用互斥量保护列表

#include 
#include 
#include 

std::list some_list;    // 1
std::mutex some_mutex;    // 2

void add_to_list(int new_value)
{
  std::lock_guard guard(some_mutex);    // 3
  some_list.push_back(new_value);
}

bool list_contains(int value_to_find)
{
  std::lock_guard guard(some_mutex);    // 4
  return std::find(some_list.begin(),some_list.end(),value_to_find) != some_list.end();
}

清单3.1中有一个全局变量,这个全局变量被一个全局的互斥量保护


add_to_list()list_contains()函数中使用std::lock_guard,使得这两个函数中对数据的访问是互斥的:list_contains()不可能看到正在被add_to_list()修改的列表。


虽然某些情况下,使用全局变量没问题,但在大多数情况下,互斥量通常会与保护的数据放在同一个类中,而不是定义成全局变量。


这是面向对象设计的准则:将其放在一个类中,就可让他们联系在一起,也可对类的功能进行封装,并进行数据保护。


在这种情况下,函数add_to_listlist_contains可以作为这个类的成员函数。


互斥量和要保护的数据,在类中都需要定义为private成员,这会让访问数据的代码变的清晰,并且容易看出在什么时候对互斥量上锁。


当所有成员函数都会在调用时对数据上锁,结束时对数据解锁,那么就保证了数据访问时不变量不被破坏。


当然,也不是总是那么理想,聪明的你一定注意到了:当其中一个成员函数返回的是保护数据的指针或引用时,会破坏对数据的保护。


具有访问能力的指针或引用可以访问(并可能修改)被保护的数据,而不会被互斥锁限制。


互斥量保护的数据需要对接口的设计相当谨慎,要确保互斥量能锁住任何对保护数据的访问,并且不留后门。


3.2.2 精心组织代码来保护共享数据

使用互斥量来保护数据,并不是仅仅在每一个成员函数中都加入一个std::lock_guard对象那么简单;一个迷失的指针或引用,将会让这种保护形同虚设。


不过,检查迷失指针或引用是很容易的,只要没有成员函数通过返回值或者输出参数的形式向其调用者返回指向受保护数据的指针或引用,数据就是安全的。


如果你还想往祖坟上刨,就没这么简单了。


在确保成员函数不会传出指针或引用的同时,检查成员函数是否通过指针或引用的方式来调用也是很重要的(尤其是这个 *** 作不在你的控制下时)


函数可能没在互斥量保护的区域内,存储着指针或者引用,这样就很危险。


更危险的是:将保护数据作为一个运行时参数,如同下面清单中所示那样。


清单3.2无意中传递了保护数据的引用

class some_data
{
  int a;
  std::string b;
public:
  void do_something();
};

class data_wrapper
{
private:
  some_data data;
  std::mutex m;
public:
  template
  void process_data(Function func)
  {
    std::lock_guard l(m);
    func(data);    // 1 传递“保护”数据给用户函数
  }
};

some_data* unprotected;

void malicious_function(some_data& protected_data)
{
  unprotected=&protected_data;
}

data_wrapper x;
void foo()
{
  x.process_data(malicious_function);    // 2 传递一个恶意函数
  unprotected->do_something();    // 3 在无保护的情况下访问保护数据
}

例子中process_data看起来没有任何问题,std::lock_guard对数据做了很好的保护,但调用用户提供的函数func,就意味着foo能够绕过保护机制将函数malicious_function传递进去,在没有锁定互斥量的情况下调用do_something()


这段代码的问题在于根本没有保护,只是将所有可访问的数据结构代码标记为互斥。


函数foo()中调用unprotected->do_something()的代码未能被标记为互斥。


这种情况下,C++线程库无法提供任何帮助,只能由程序员来使用正确的互斥锁来保护数据。


从乐观的角度上看,还是有方法可循的:切勿将受保护数据的指针或引用传递到互斥锁作用域之外,无论是函数返回值,还是存储在外部可见内存,亦或是以参数的形式传递到用户提供的函数中去。


虽然这是在使用互斥量保护共享数据时常犯的错误,但绝不仅仅是一个潜在的陷阱而已。


下一节中,你将会看到,即便是使用了互斥量对数据进行了保护,条件竞争依旧可能存在。


3.2.3 发现接口内在的条件竞争

因为使用了互斥量或其他机制保护了共享数据,就不必再为条件竞争所担忧吗?并不是,你依旧需要确定数据受到了保护。


回想之前双链表的例子,为了能让线程安全地删除一个节点,需要确保防止对这三个节点(待删除的节点及其前后相邻的节点)的并发访问。


如果只对指向每个节点的指针进行访问保护,那就和没有使用互斥量一样,条件竞争仍会发生——除了指针,整个数据结构和整个删除 *** 作需要保护。


这种情况下最简单的解决方案就是使用互斥量来保护整个链表,如清单3.1所示。


尽管链表的个别 *** 作是安全的,但不意味着你就能走出困境;即使在一个很简单的接口中,依旧可能遇到条件竞争。


例如,构建一个类似于std::stack结构的栈(清单3.3),除了构造函数和swap()以外,需要对std::stack提供五个 *** 作:push()一个新元素进栈,pop()一个元素出栈,top()查看栈顶元素,empty()判断栈是否是空栈,size()了解栈中有多少个元素。


即使修改了top(),使其返回一个拷贝而非引用(即遵循了3.2.2节的准则),对内部数据使用一个互斥量进行保护,不过这个接口仍存在条件竞争。


这个问题不仅存在于基于互斥量实现的接口中,在无锁实现的接口中,条件竞争依旧会产生。


这是接口的问题,与其实现方式无关。


清单3.3 std::stack容器的实现

template >
class stack
{
public:
  explicit stack(const Container&);
  explicit stack(Container&& = Container());
  template  explicit stack(const Alloc&);
  template  stack(const Container&, const Alloc&);
  template  stack(Container&&, const Alloc&);
  template  stack(stack&&, const Alloc&);

  bool empty() const;
  size_t size() const;
  T& top();
  T const& top() const;
  void push(T const&);
  void push(T&&);
  void pop();
  void swap(stack&&);
};

虽然empty()size()可能在被调用并返回时是正确的,但其的结果是不可靠的;当它们返回后,其他线程就可以自由地访问栈,并且可能push()多个新元素到栈中,也可能pop()一些已在栈中的元素。


这样的话,之前从empty()size()得到的结果就有问题了。


特别地,当栈实例是非共享的,如果栈非空,使用empty()检查再调用top()访问栈顶部的元素是安全的。


如下代码所示:

stack s;
if (! s.empty()){    // 1
  int const value = s.top();    // 2
  s.pop();    // 3
  do_something(value);
}

以上是单线程安全代码:对一个空栈使用top()是未定义行为。


对于共享的栈对象,这样的调用顺序就不再安全了,因为在调用empty()和调用top()之间,可能有来自另一个线程的pop()调用并删除了最后一个元素。


这是一个经典的条件竞争,使用互斥量对栈内部数据进行保护,但依旧不能阻止条件竞争的发生,这就是接口固有的问题。


怎么解决呢?问题发生在接口设计上,所以解决的方法也就是改变接口设计。


有人会问:怎么改?在这个简单的例子中,当调用top()时,发现栈已经是空的了,那么就抛出异常。


虽然这能直接解决这个问题,但这是一个笨拙的解决方案,这样的话,即使empty()返回false的情况下,你也需要异常捕获机制。


本质上,这样的改变会让empty()成为一个多余函数。


当仔细的观察过之前的代码段,就会发现另一个潜在的条件竞争在调用top()pop()之间。


假设两个线程运行着前面的代码,并且都引用同一个栈对象s


这并非罕见的情况,当为性能而使用线程时,多个线程在不同的数据上执行相同的 *** 作是很平常的,并且共享同一个栈可以将工作分摊给它们。


假设,一开始栈中只有两个元素,这时任一线程上的empty()top()都存在竞争,只需要考虑可能的执行顺序即可。


当栈被一个内部互斥量所保护时,只有一个线程可以调用栈的成员函数,所以调用可以很好地交错,并且do_something()是可以并发运行的。


在表3.1中,展示一种可能的执行顺序。


3.1 一种可能执行顺序

Thread A

Thread B

if (!s.empty);

if(!s.empty);

int const value = s.top();

int const value = s.top();

s.pop();

do_something(value);

s.pop();

do_something(value);

当线程运行时,调用两次top(),栈没被修改,所以每个线程能得到同样的值。


不仅是这样,在调用top()函数调用的过程中(两次)pop()函数都没有被调用。


这样,在其中一个值再读取的时候,虽然不会出现写后读的情况,但其值已被处理了两次。


这种条件竞争,比未定义的empty()/top()竞争更加严重;虽然其结果依赖于do_something()的结果,但因为看起来没有任何错误,就会让这个Bug很难定位。


这就需要接口设计上有较大的改动,提议之一就是使用同一互斥量来保护top()pop()


Tom Cargill[1]指出当一个对象的拷贝构造函数在栈中抛出一个异常,这样的处理方式就会有问题。


Herb Sutter[2]看来,这个问题可以从异常安全的角度完美解决,不过潜在的条件竞争,可能会组成一些新的组合。


说一些大家没有意识到的问题:假设有一个stack>vector是一个动态容器,当你拷贝一个vetcor,标准库会从堆上分配很多内存来完成这次拷贝。


当这个系统处在重度负荷,或有严重的资源限制的情况下,这种内存分配就会失败,所以vector的拷贝构造函数可能会抛出一个std::bad_alloc异常。


vector中存有大量元素时,这种情况发生的可能性更大。


pop()函数返回d出值(也就是从栈中将这个值移除),会有一个潜在的问题:这个值被返回到调用函数的时候,栈才被改变;但当拷贝数据的时候,调用函数抛出一个异常会怎么样? 如果事情真的发生了,要d出的数据将会丢失;它的确从栈上移出了,但是拷贝失败了!std::stack的设计人员将这个 *** 作分为两部分:先获取顶部元素(top()),然后从栈中移除(pop())


这样,在不能安全的将元素拷贝出去的情况下,栈中的这个数据还依旧存在,没有丢失。


当问题是堆空间不足,应用可能会释放一些内存,然后再进行尝试。


不幸的是,这样的分割却制造了本想避免或消除的条件竞争。


幸运的是,我们还有的别的选项,但是使用这些选项是要付出代价的。


选项1 传入一个引用

第一个选项是将变量的引用作为参数,传入pop()函数中获取想要的d出值

std::vector result;
some_stack.pop(result);

大多数情况下,这种方式还不错,但有明显的缺点:需要构造出一个栈中类型的实例,用于接收目标值。


对于一些类型,这样做是不现实的,因为临时构造一个实例,从时间和资源的角度上来看,都是不划算。


对于其他的类型,这样也不总能行得通,因为构造函数需要的一些参数,在代码的这个阶段不一定可用。


最后,需要可赋值的存储类型,这是一个重大限制:即使支持移动构造,甚至是拷贝构造(从而允许返回一个值),很多用户自定义类型可能都不支持赋值 *** 作。


选项2:无异常抛出的拷贝构造函数或移动构造函数

对于有返回值的pop()函数来说,只有异常安全方面的担忧(当返回值时可以抛出一个异常)


很多类型都有拷贝构造函数,它们不会抛出异常,并且随着新标准中对右值引用的支持(详见附录AA.1),很多类型都将会有一个移动构造函数,即使他们和拷贝构造函数做着相同的事情,它也不会抛出异常。


一个有用的选项可以限制对线程安全的栈的使用,并且能让栈安全的返回所需的值,而不会抛出异常。


虽然安全,但非可靠。


尽管能在编译时,使用std::is_no_throw_copy_constructiblestd::is_nothrow_move_constructible类型特征,让拷贝或移动构造函数不抛出异常,但是这种方式的局限性太强。


很多用户定义类型有可抛出异常的拷贝构造函数,没有移动构造函数;或是,都不抛出异常的构造函数(这种改变会随着C++11中左值引用,越来越为大众所用)


如果类型不能存储线程安全的堆栈,想想该是多么的不幸。


选项3:返回指向d出值的指针

第三个选择是返回一个指向d出元素的指针,而不是直接返回值。


指针的优势是自由拷贝,并且不会产生异常,这样你就能避免Cargill提到的异常问题了。


缺点就是返回一个指针需要对对象的内存分配进行管理,对于简单数据类型(比如:int),内存管理的开销要远大于直接返回值。


对于选择这个方案的接口,使用std::shared_ptr是个不错的选择;不仅能避免内存泄露(因为当对象中指针销毁时,对象也会被销毁),而且标准库能够完全控制内存分配方案,也就不需要newdelete *** 作。


这种优化是很重要的:因为堆栈中的每个对象,都需要用new进行独立的内存分配,相较于非线程安全版本,这个方案的开销相当大。


选项4选项1 + 选项2”选项1 + 选项3”

对于通用的代码来说,灵活性不应忽视。


当你已经选择了选项23时,再去选择1也是很容易的。


这些选项提供给用户,让用户自己选择对于他们自己来说最合适,最经济的方案。


例:定义线程安全的堆栈

清单3.4中是一个接口没有条件竞争的堆栈类定义,它实现了选项1和选项3:重载了pop(),使用一个局部引用去存储d出值,并返回一个std::shared_ptr<>对象。


它有一个简单的接口,只有两个函数:push()pop();

清单3.4 线程安全的堆栈类定义(概述)

#include 
#include   // For std::shared_ptr<>

struct empty_stack: std::exception
{
  const char* what() const throw();
};

template
class threadsafe_stack
{
public:
  threadsafe_stack();
  threadsafe_stack(const threadsafe_stack&);
  threadsafe_stack& operator=(const threadsafe_stack&) = delete; // 1 赋值 *** 作被删除

  void push(T new_value);
  std::shared_ptr pop();
  void pop(T& value);
  bool empty() const;
};

削减接口可以获得最大程度的安全,甚至限制对栈的一些 *** 作。


栈是不能直接赋值的,因为赋值 *** 作已经删除了(详见附录AA.2),并且这里没有swap()函数。


栈可以拷贝的,假设栈中的元素可以拷贝。


当栈为空时,pop()函数会抛出一个empty_stack异常,所以在empty()函数被调用后,其他部件还能正常工作。


如选项3描述的那样,使用std::shared_ptr可以避免内存分配管理的问题,并避免多次使用newdelete *** 作。


堆栈中的五个 *** 作,现在就剩下三个:push(), pop()empty()(这里empty()都有些多余)


简化接口更有利于数据控制,可以保证互斥量将一个 *** 作完全锁住。


下面的代码将展示一个简单的实现——封装std::stack<>的线程安全堆栈。


清单3.5 扩充(线程安全)堆栈

#include 
#include 
#include 
#include 

struct empty_stack: std::exception
{
  const char* what() const throw() {
    return "empty stack!";
  };
};

template
class threadsafe_stack
{
private:
  std::stack data;
  mutable std::mutex m;

public:
  threadsafe_stack()
    : data(std::stack()){}

  threadsafe_stack(const threadsafe_stack& other)
  {
    std::lock_guard lock(other.m);
    data = other.data; // 1 在构造函数体中的执行拷贝
  }

  threadsafe_stack& operator=(const threadsafe_stack&) = delete;

  void push(T new_value)
  {
    std::lock_guard lock(m);
    data.push(new_value);
  }

  std::shared_ptr pop()
  {
    std::lock_guard lock(m);
    if(data.empty()) throw empty_stack(); // 在调用pop前,检查栈是否为空

    std::shared_ptr const res(std::make_shared(data.top())); // 在修改堆栈前,分配出返回值
    data.pop();
    return res;
  }

  void pop(T& value)
  {
    std::lock_guard lock(m);
    if(data.empty()) throw empty_stack();

    value=data.top();
    data.pop();
  }

  bool empty() const
  {
    std::lock_guard lock(m);
    return data.empty();
  }
};

堆栈可以拷贝——拷贝构造函数对互斥量上锁,再拷贝堆栈。


构造函数体中的拷贝使用互斥量来确保复制结果的正确性,这样的方式比成员初始化列表好。


之前对top()pop()函数的讨论中,恶性条件竞争已经出现,因为锁的粒度太小,需要保护的 *** 作并未全覆盖到。


不过,锁住的颗粒过大同样会有问题。


还有一个问题,一个全局互斥量要去保护全部共享数据,在一个系统中存在有大量的共享数据时,因为线程可以强制运行,甚至可以访问不同位置的数据,抵消了并发带来的性能提升。


在第一版为多处理器系统设计Linux内核中,就使用了一个全局内核锁。


虽然这个锁能正常工作,但在双核处理系统的上的性能要比两个单核系统的性能差很多,四核系统就更不能提了。


太多请求去竞争占用内核,使得依赖于处理器运行的线程没有办法很好的工作。


随后修正的Linux内核加入了一个细粒度锁方案,因为少了很多内核竞争,这时四核处理系统的性能就和单核处理的四倍差不多了。


使用多个互斥量保护所有的数据,细粒度锁也有问题。


如前所述,当增大互斥量覆盖数据的粒度时,只需要锁住一个互斥量。


但是,这种方案并非放之四海皆准,比如:互斥量正在保护一个独立类的实例;这种情况下,锁的状态的下一个阶段,不是离开锁定区域将锁定区域还给用户,就是有独立的互斥量去保护这个类的全部实例。


当然,这两种方式都不理想。


一个给定 *** 作需要两个或两个以上的互斥量时,另一个潜在的问题将出现:死锁。


与条件竞争完全相反——不同的两个线程会互相等待,从而什么都没做。


3.2.4 死锁:问题描述及解决方案

试想有一个玩具,这个玩具由两部分组成,必须拿到这两个部分,才能够玩。


例如,一个玩具鼓,需要一个鼓锤和一个鼓才能玩。


现在有两个小孩,他们都很喜欢玩这个玩具。


当其中一个孩子拿到了鼓和鼓锤时,那就可以尽情的玩耍了。


当另一孩子想要玩,他就得等待另一孩子玩完才行。


再试想,鼓和鼓锤被放在不同的玩具箱里,并且两个孩子在同一时间里都想要去敲鼓。


之后,他们就去玩具箱里面找这个鼓。


其中一个找到了鼓,并且另外一个找到了鼓锤。


现在问题就来了,除非其中一个孩子决定让另一个先玩,他可以把自己的那部分给另外一个孩子;但当他们都紧握着自己所有的部分而不给予,那么这个鼓谁都没法玩。


现在没有孩子去争抢玩具,但线程有对锁的竞争:一对线程需要对他们所有的互斥量做一些 *** 作,其中每个线程都有一个互斥量,且等待另一个解锁。


这样没有线程能工作,因为他们都在等待对方释放互斥量。


这种情况就是死锁,它的最大问题就是由两个或两个以上的互斥量来锁定一个 *** 作。


避免死锁的一般建议,就是让两个互斥量总以相同的顺序上锁:总在互斥量B之前锁住互斥量A,就永远不会死锁。


某些情况下是可以这样用,因为不同的互斥量用于不同的地方。


不过,事情没那么简单,比如:当有多个互斥量保护同一个类的独立实例时,一个 *** 作对同一个类的两个不同实例进行数据的交换 *** 作,为了保证数据交换 *** 作的正确性,就要避免数据被并发修改,并确保每个实例上的互斥量都能锁住自己要保护的区域。


不过,选择一个固定的顺序(例如,实例提供的第一互斥量作为第一个参数,提供的第二个互斥量为第二个参数),可能会适得其反:在参数交换了之后,两个线程试图在相同的两个实例间进行数据交换时,程序又死锁了!

很幸运,C++标准库有办法解决这个问题,std::lock——可以一次性锁住多个(两个以上)的互斥量,并且没有副作用(死锁风险)


下面的程序清单中,就来看一下怎么在一个简单的交换 *** 作中使用std::lock


清单3.6 交换 *** 作中使用std::lock()std::lock_guard

// 这里的std::lock()需要包含头文件
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
  some_big_object some_detail;
  std::mutex m;
public:
  X(some_big_object const& sd):some_detail(sd){}

  friend void swap(X& lhs, X& rhs)
  {
    if(&lhs==&rhs)
      return;
    std::lock(lhs.m,rhs.m); // 1
    std::lock_guard lock_a(lhs.m,std::adopt_lock); // 2
    std::lock_guard lock_b(rhs.m,std::adopt_lock); // 3
    swap(lhs.some_detail,rhs.some_detail);
  }
};

首先,检查参数是否是不同的实例,因为 *** 作试图获取std::mutex对象上的锁,所以当其被获取时,结果很难预料。


(一个互斥量可以在同一线程上多次上锁,标准库中std::recursive_mutex提供这样的功能。


详情见3.3.3)


然后,调用std::lock()锁住两个互斥量,并且两个std:lock_guard实例已经创建好②③


提供std::adopt_lock参数除了表示std::lock_guard对象可获取锁之外,还将锁交由std::lock_guard对象管理,而不需要std::lock_guard对象再去构建新的锁。


这样,就能保证在大多数情况下,函数退出时互斥量能被正确的解锁(保护 *** 作可能会抛出一个异常),也允许使用一个简单的“return”作为返回。


还有,需要注意的是,当使用std::lock去锁lhs.mrhs.m时,可能会抛出异常;这种情况下,异常会传播到std::lock之外。


std::lock成功的获取一个互斥量上的锁,并且当其尝试从另一个互斥量上再获取锁时,就会有异常抛出,第一个锁也会随着异常的产生而自动释放,所以std::lock要么将两个锁都锁住,要不一个都不锁。


虽然std::lock可以在这情况下(获取两个以上的锁)避免死锁,但它没办法帮助你获取其中一个锁。


这时,不得不依赖于开发者的纪律性(译者:也就是经验),来确保你的程序不会死锁。


这并不简单:死锁是多线程编程中一个令人相当头痛的问题,并且死锁经常是不可预见的,因为在大多数时间里,所有工作都能很好的完成。


不过,也一些相对简单的规则能帮助写出无死锁的代码。


3.2.5 避免死锁的进阶指导

虽然锁是产生死锁的一般原因,但也不排除死锁出现在其他地方。


无锁的情况下,仅需要每个std::thread对象调用join(),两个线程就能产生死锁。


这种情况下,没有线程可以继续运行,因为他们正在互相等待。


这种情况很常见,一个线程会等待另一个线程,其他线程同时也会等待第一个线程结束,所以三个或更多线程的互相等待也会发生死锁。


为了避免死锁,这里的指导意见为:当机会来临时,不要拱手让人。


以下提供一些个人的指导建议,如何识别死锁,并消除其他线程的等待。


避免嵌套锁

第一个建议往往是最简单的:一个线程已获得一个锁时,再别去获取第二个。


如果能坚持这个建议,因为每个线程只持有一个锁,锁上就不会产生死锁。


即使互斥锁造成死锁的最常见原因,也可能会在其他方面受到死锁的困扰(比如:线程间的互相等待)


当你需要获取多个锁,使用一个std::lock来做这件事(对获取锁的 *** 作上锁),避免产生死锁。


避免在持有锁时调用用户提供的代码

第二个建议是次简单的:因为代码是用户提供的,你没有办法确定用户要做什么;用户程序可能做任何事情,包括获取锁。


你在持有锁的情况下,调用用户提供的代码;如果用户代码要获取一个锁,就会违反第一个指导意见,并造成死锁(有时,这是无法避免的)


当你正在写一份通用代码,例如3.2.3中的栈,每一个 *** 作的参数类型,都在用户提供的代码中定义,就需要其他指导意见来帮助你。


使用固定顺序获取锁

当硬性条件要求你获取两个以上(包括两个)的锁,并且不能使用std::lock单独 *** 作来获取它们;那么最好在每个线程上,用固定的顺序获取它们获取它们()


3.2.4节中提到一种当需要获取两个互斥量时,避免死锁的方法:关键是如何在线程之间,以一定的顺序获取锁。


一些情况下,这种方式相对简单。


比如,3.2.3节中的栈——每个栈实例中都内置有互斥量,但是对数据成员存储的 *** 作上,栈就需要带调用用户提供的代码。


虽然,可以添加一些约束,对栈上存储的数据项不做任何 *** 作,对数据项的处理仅限于栈自身。


这会给用户提供的栈增加一些负担,但是一个容器很少去访问另一个容器中存储的数据,即使发生了也会很明显,所以这对于通用栈来说并不是一个特别沉重的负担。


其他情况下,这就不会那么简单了,例如:3.2.4节中的交换 *** 作,这种情况下你可能同时锁住多个互斥量(但是有时不会发生)


当回看3.1节中那个链表连接例子时,将会看到列表中的每个节点都会有一个互斥量保护。


为了访问列表,线程必须获取他们感兴趣节点上的互斥锁。


当一个线程删除一个节点,它必须获取三个节点上的互斥锁:将要删除的节点,两个邻接节点(因为他们也会被修改)


同样的,为了遍历链表,线程必须保证在获取当前节点的互斥锁前提下,获得下一个节点的锁,要保证指向下一个节点的指针不会同时被修改。


一旦下一个节点上的锁被获取,那么第一个节点的锁就可以释放了,因为没有持有它的必要性了。


这种手递手锁的模式允许多个线程访问列表,为每一个访问的线程提供不同的节点。


但是,为了避免死锁,节点必须以同样的顺序上锁:如果两个线程试图用互为反向的顺序,使用手递手锁遍历列表,他们将执行到列表中间部分时,发生死锁。


当节点AB在列表中相邻,当前线程可能会同时尝试获取AB上的锁。


另一个线程可能已经获取了节点B上的锁,并且试图获取节点A上的锁——经典的死锁场景。


AC节点中的B节点正在被删除时,如果有线程在已获取AC上的锁后,还要获取B节点上的锁时,当一个线程遍历列表的时候,这样的情况就可能发生死锁。


这样的线程可能会试图首先锁住A节点或C节点(根据遍历的方向),但是后面就会发现,它无法获得B上的锁,因为线程在执行删除任务的时候,已经获取了B上的锁,并且同时也获取了AC上的锁。


这里提供一种避免死锁的方式,定义遍历的顺序,所以一个线程必须先锁住A才能获取B的锁,在锁住B之后才能获取C的锁。


这将消除死锁发生的可能性,在不允许反向遍历的列表上。


类似的约定常被用来建立其他的数据结构。


使用锁的层次结构

虽然,这对于定义锁的顺序,的确是一个特殊的情况,但锁的层次的意义在于提供对运行时约定是否被坚持的检查。


这个建议需要对你的应用进行分层,并且识别在给定层上所有可上锁的互斥量。


当代码试图对一个互斥量上锁,在该层锁已被低层持有时,上锁是不允许的。


你可以在运行时对其进行检查,通过分配层数到每个互斥量上,以及记录被每个线程上锁的互斥量。


下面的代码列表中将展示两个线程如何使用分层互斥。


清单3.7 使用层次锁来避免死锁

hierarchical_mutex high_level_mutex(10000); // 1
hierarchical_mutex low_level_mutex(5000);  // 2

int do_low_level_stuff();

int low_level_func()
{
  std::lock_guard lk(low_level_mutex); // 3
  return do_low_level_stuff();
}

void high_level_stuff(int some_param);

void high_level_func()
{
  std::lock_guard lk(high_level_mutex); // 4
  high_level_stuff(low_level_func()); // 5
}

void thread_a()  // 6
{
  high_level_func();
}

hierarchical_mutex other_mutex(100); // 7
void do_other_stuff();

void other_stuff()
{
  high_level_func();  // 8
  do_other_stuff();
}

void thread_b() // 9
{
  std::lock_guard lk(other_mutex); // 10
  other_stuff();
}

thread_a()遵守规则,所以它运行的没问题。


另一方面,thread_b()无视规则,因此在运行的时候肯定会失败。


thread_a()调用high_level_func(),让high_level_mutex上锁(其层级值为10000),为了获取high_level_stuff()的参数对互斥量上锁,之后调用low_level_func()


low_level_func()会对low_level_mutex上锁,这就没有问题了,因为这个互斥量有一个低层值5000


thread_b()运行就不会顺利了。


首先,它锁住了other_mutex,这个互斥量的层级值只有100


这就意味着,超低层级的数据已被保护。


other_stuff()调用high_level_func()时,就违反了层级结构:high_level_func()试图获取high_level_mutex,这个互斥量的层级值是10000,要比当前层级值100大很多。


因此hierarchical_mutex将会产生一个错误,可能会是抛出一个异常,或直接终止程序。


在层级互斥量上产生死锁,是不可能的,因为互斥量本身会严格遵循约定顺序,进行上锁。


这也意味,当多个互斥量在是在同一级上时,不能同时持有多个锁,所以手递手锁的方案需要每个互斥量在一条链上,并且每个互斥量都比其前一个有更低的层级值,这在某些情况下无法实现。


例子也展示了另一点,std::lock_guard<>模板与用户定义的互斥量类型一起使用。


虽然hierarchical_mutex不是C++标准的一部分,但是它写起来很容易;一个简单的实现在列表3.8中展示出来。


尽管它是一个用户定义类型,它可以用于std::lock_guard<>模板中,因为它的实现有三个成员函数为了满足互斥量 *** 作:lock(), unlock() try_lock()


虽然你还没见过try_lock()怎么使用,但是其使用起来很简单:当互斥量上的锁被一个线程持有,它将返回false,而不是等待调用的线程,直到能够获取互斥量上的锁为止。


std::lock()的内部实现中,try_lock()会作为避免死锁算法的一部分。


列表3.8 简单的层级互斥量实现

class hierarchical_mutex
{
  std::mutex internal_mutex;

  unsigned long const hierarchy_value;
  unsigned long previous_hierarchy_value;

  static thread_local unsigned long this_thread_hierarchy_value;  // 1

  void check_for_hierarchy_violation()
  {
    if(this_thread_hierarchy_value <= hierarchy_value)  // 2
    {
      throw std::logic_error(“mutex hierarchy violated”);
    }
  }

  void update_hierarchy_value()
  {
    previous_hierarchy_value=this_thread_hierarchy_value;  // 3
    this_thread_hierarchy_value=hierarchy_value;
  }

public:
  explicit hierarchical_mutex(unsigned long value):
      hierarchy_value(value),
      previous_hierarchy_value(0)
  {}

  void lock()
  {
    check_for_hierarchy_violation();
    internal_mutex.lock();  // 4
    update_hierarchy_value();  // 5
  }

  void unlock()
  {
    this_thread_hierarchy_value=previous_hierarchy_value;  // 6
    internal_mutex.unlock();
  }

  bool try_lock()
  {
    check_for_hierarchy_violation();
    if(!internal_mutex.try_lock())  // 7
      return false;
    update_hierarchy_value();
    return true;
  }
};
thread_local unsigned long
     hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);  // 8

这里重点是使用了thread_local的值来代表当前线程的层级值:this_thread_hierarchy_value


它被初始化为最大值,所以最初所有线程都能被锁住。


因为其声明中有thread_local,所以每个线程都有其拷贝副本,这样线程中变量状态完全独立,当从另一个线程进行读取时,变量的状态也完全独立。


参见附录AA.8节,有更多与thread_local相关的内容。


所以,第一次线程锁住一个hierarchical_mutex时,this_thread_hierarchy_value的值是ULONG_MAX


由于其本身的性质,这个值会大于其他任何值,所以会通过check_for_hierarchy_vilation()的检查。


在这种检查方式下,lock()代表内部互斥锁已被锁住


一旦成功锁住,你可以更新层级值了


当你现在锁住另一个hierarchical_mutex时,还持有第一个锁,this_thread_hierarchy_value的值将会显示第一个互斥量的层级值。


第二个互斥量的层级值必须小于已经持有互斥量检查函数才能通过。


现在,最重要的是为当前线程存储之前的层级值,所以你可以调用unlock()对层级值进行保存;否则,就锁不住任何互斥量(第二个互斥量的层级数高于第一个互斥量),即使线程没有持有任何锁。


因为保存了之前的层级值,只有当持有internal_mutex,且在解锁内部互斥量之前存储它的层级值,才能安全的将hierarchical_mutex自身进行存储。


这是因为hierarchical_mutex被内部互斥量的锁所保护着。


try_lock()lock()的功能相似,除了在调用internal_mutextry_lock()失败时,不能持有对应锁,所以不必更新层级值,并直接返回false


虽然是运行时检测,但是它没有时间依赖性——不必去等待那些导致死锁出现的罕见条件。


同时,设计过程需要去拆分应用,互斥量在这样的情况下可以消除可能导致死锁的可能性。


这样的设计练习很有必要去做一下,即使你之后没有去做,代码也会在运行时进行检查。


超越锁的延伸扩展

如我在本节开头提到的那样,死锁不仅仅会发生在锁之间;死锁也会发生在任何同步构造中(可能会产生一个等待循环),因此这方面也需要有指导意见,例如:要去避免获取嵌套锁等待一个持有锁的线程是一个很糟糕的决定,因为线程为了能继续运行可能需要获取对应的锁。


类似的,如果去等待一个线程结束,它应该可以确定这个线程的层级,这样一个线程只需要等待比起层级低的线程结束即可。


可以用一个简单的办法去确定,以添加的线程是否在同一函数中被启动,如同在3.1.2节和3.3节中描述的那样。


当代码已经能规避死锁,std::lock()std::lock_guard能组成简单的锁覆盖大多数情况,但是有时需要更多的灵活性。


在这些情况,可以使用标准库提供的std::unique_lock模板。


std::lock_guard,这是一个参数化的互斥量模板类,并且它提供很多RAII类型锁用来管理std::lock_guard类型,可以让代码更加灵活。


3.2.6 std::unique_lock——灵活的锁

std::unqiue_lock使用更为自由的不变量,这样std::unique_lock实例不会总与互斥量的数据类型相关,使用起来要比std:lock_guard更加灵活。


首先,可将std::adopt_lock作为第二个参数传入构造函数,对互斥量进行管理;也可以将std::defer_lock作为第二个参数传递进去,表明互斥量应保持解锁状态。


这样,就可以被std::unique_lock对象(不是互斥量)lock()函数的所获取,或传递std::unique_lock对象到std::lock()中。


清单3.6可以轻易的转换为清单3.9,使用std::unique_lockstd::defer_lock,而非std::lock_guardstd::adopt_lock


代码长度相同,几乎等价,唯一不同的就是:std::unique_lock会占用比较多的空间,并且比std::lock_guard稍慢一些。


保证灵活性要付出代价,这个代价就是允许std::unique_lock实例不带互斥量:信息已被存储,且已被更新。


清单3.9 交换 *** 作中std::lock()std::unique_lock的使用

class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
  some_big_object some_detail;
  std::mutex m;
public:
  X(some_big_object const& sd):some_detail(sd){}
  friend void swap(X& lhs, X& rhs)
  {
    if(&lhs==&rhs)
      return;
    std::unique_lock lock_a(lhs.m,std::defer_lock); // 1 
    std::unique_lock lock_b(rhs.m,std::defer_lock); // 1 std::def_lock 留下未上锁的互斥量
    std::lock(lock_a,lock_b); // 2 互斥量在这里上锁
    swap(lhs.some_detail,rhs.some_detail);
  }
};

列表3.9中,因为std::unique_lock支持lock(), try_lock()unlock()成员函数,所以能将std::unique_lock对象传递到std::lock()


这些同名的成员函数在低层做着实际的工作,并且仅更新std::unique_lock实例中的标志,来确定该实例是否拥有特定的互斥量,这个标志是为了确保unlock()在析构函数中被正确调用。


如果实例拥有互斥量,那么析构函数必须调用unlock();但当实例中没有互斥量时,析构函数就不能去调用unlock()


这个标志可以通过owns_lock()成员变量进行查询。


可能如你期望的那样,这个标志被存储在某个地方。


因此,std::unique_lock对象的体积通常要比std::lock_guard对象大,当使用std::unique_lock替代std::lock_guard,因为会对标志进行适当的更新或检查,就会做些轻微的性能惩罚。


std::lock_guard已经能够满足你的需求,那么还是建议你继续使用它。


当需要更加灵活的锁时,最好选择std::unique_lock,因为它更适合于你的任务。


你已经看到一个递延锁的例子,另外一种情况是锁的所有权需要从一个域转到另一个域。


3.2.7 不同域中互斥量所有权的传递

std::unique_lock实例没有与自身相关的互斥量,一个互斥量的所有权可以通过移动 *** 作,在不同的实例中进行传递。


某些情况下,这种转移是自动发生的,例如:当函数返回一个实例;另些情况下,需要显式的调用std::move()来执行移动 *** 作。


从本质上来说,需要依赖于源值是否是左值——一个实际的值或是引用——或一个右值——一个临时类型。


当源值是一个右值,为了避免转移所有权过程出错,就必须显式移动成左值。


std::unique_lock是可移动,但不可赋值的类型。


附录AA.1.1节有更多与移动语句相关的信息。


一种使用可能是允许一个函数去锁住一个互斥量,并且将所有权移到调用者上,所以调用者可以在这个锁保护的范围内执行额外的动作。


下面的程序片段展示了:函数get_lock()锁住了互斥量,然后准备数据,返回锁的调用函数:

std::unique_lock get_lock()
{
  extern std::mutex some_mutex;
  std::unique_lock lk(some_mutex);
  prepare_data();
  return lk;  // 1
}
void process_data()
{
  std::unique_lock lk(get_lock());  // 2
  do_something();
}

lk在函数中被声明为自动变量,它不需要调用std::move(),可以直接返回(编译器负责调用移动构造函数)


process_data()函数直接转移std::unique_lock实例的所有权,调用do_something()可使用的正确数据(数据没有受到其他线程的修改)


通常这种模式会用于已锁的互斥量,其依赖于当前程序的状态,或依赖于传入返回类型为std::unique_lock的函数(或以参数返回)


这样的用法不会直接返回锁,不过网关类的一个数据成员可用来确认已经对保护数据的访问权限进行上锁。


这种情况下,所有的访问都必须通过网关类:当你想要访问数据,需要获取网关类的实例(如同前面的例子,通过调用get_lock()之类函数)来获取锁。


之后你就可以通过网关类的成员函数对数据进行访问。


当完成访问,可以销毁这个网关类对象,将锁进行释放,让别的线程来访问保护数据。


这样的一个网关类可能是可移动的(所以他可以从一个函数进行返回),在这种情况下锁对象的数据必须是可移动的。


std::unique_lock的灵活性同样也允许实例在销毁之前放弃其拥有的锁。


可以使用unlock()来做这件事,如同一个互斥量:std::unique_lock的成员函数提供类似于锁定和解锁互斥量的功能。


std::unique_lock实例在销毁前释放锁的能力,当锁没有必要在持有的时候,可以在特定的代码分支对其进行选择性的释放。


这对于应用性能来说很重要,因为持有锁的时间增加会导致性能下降,其他线程会等待这个锁的释放,避免超越 *** 作。


3.2.8 锁的粒度

3.2.3节中,已经对锁的粒度有所了解:锁的粒度是一个摆手术语(hand-waving term),用来描述通过一个锁保护着的数据量大小。


一个细粒度锁(a fine-grained lock)能够保护较小的数据量,一个粗粒度锁(a coarse-grained lock)能够保护较多的数据量。


选择粒度对于锁来说很重要,为了保护对应的数据,保证锁有能力保护这些数据也很重要。


我们都知道,在超市等待结账的时候,正在结账的顾客突然意识到他忘了拿蔓越莓酱,然后离开柜台去拿,并让其他的人都等待他回来;或者当收银员,准备收钱时,顾客才去翻钱包拿钱,这样的情况都会让等待的顾客很无奈。


当每个人都检查了自己要拿的东西,且能随时为拿到的商品进行支付,那么的每件事都会进行的很顺利。


这样的道理同样适用于线程:如果很多线程正在等待同一个资源(等待收银员对自己拿到的商品进行清点),当有线程持有锁的时间过长,这就会增加等待的时间(别等到结账的时候,才想起来蔓越莓酱没拿)


在可能的情况下,锁住互斥量的同时只能对共享数据进行访问;试图对锁外数据进行处理。


特别是做一些费时的动作,比如:对文件的输入/输出 *** 作进行上锁。


文件输入/输出通常要比从内存中读或写同样长度的数据慢成百上千倍,所以除非锁已经打算去保护对文件的访问,要么执行输入/输出 *** 作将会将延迟其他线程执行的时间,这很没有必要(因为文件锁阻塞住了很多 *** 作),这样多线程带来的性能效益会被抵消。


std::unique_lock在这种情况下工作正常,在调用unlock()时,代码不需要再访问共享数据;而后当再次需要对共享数据进行访问时,就可以再调用lock()了。


下面代码就是这样的一种情况:

void get_and_process_data()
{
  std::unique_lock my_lock(the_mutex);
  some_class data_to_process=get_next_data_chunk();
  my_lock.unlock();  // 1 不要让锁住的互斥量越过process()函数的调用
  result_type result=process(data_to_process);
  my_lock.lock(); // 2 为了写入数据,对互斥量再次上锁
  write_result(data_to_process,result);
}

不需要让锁住的互斥量越过对process()函数的调用,所以可以在函数调用前对互斥量手动解锁,并且在之后对其再次上锁


这能表示只有一个互斥量保护整个数据结构时的情况,不仅可能会有更多对锁的竞争,也会增加锁持锁的时间。


较多的 *** 作步骤需要获取同一个互斥量上的锁,所以持有锁的时间会更长。


成本上的双重打击也算是为向细粒度锁转移提供了双重激励和可能。


如同上面的例子,锁不仅是能锁住合适粒度的数据,还要控制锁的持有时间,以及什么 *** 作在执行的同时能够拥有锁。


一般情况下,执行必要的 *** 作时,尽可能将持有锁的时间缩减到最小。


这也就意味有一些浪费时间的 *** 作,比如:获取另外一个锁(即使你知道这不会造成死锁),或等待输入/输出 *** 作完成时没有必要持有一个锁(除非绝对需要)


清单3.63.9中,交换 *** 作需要锁住两个互斥量,其明确要求并发访问两个对象。


假设用来做比较的是一个简单的数据类型(比如:int类型),将会有什么不同么?int的拷贝很廉价,所以可以很容易的进行数据复制,并且每个被比较的对象都持有该对象的锁,在比较之后进行数据拷贝。


这就意味着,在最短时间内持有每个互斥量,并且你不会在持有一个锁的同时再去获取另一个。


下面的清单中展示了一个在这样情景中的Y类,并且展示了一个相等比较运算符的等价实现。


列表3.10 比较 *** 作符中一次锁住一个互斥量

class Y
{
private:
  int some_detail;
  mutable std::mutex m;
  int get_detail() const
  {
    std::lock_guard lock_a(m);  // 1
    return some_detail;
  }
public:
  Y(int sd):some_detail(sd){}

  friend bool operator==(Y const& lhs, Y const& rhs)
  {
    if(&lhs==&rhs)
      return true;
    int const lhs_value=lhs.get_detail();  // 2
    int const rhs_value=rhs.get_detail();  // 3
    return lhs_value==rhs_value;  // 4
  }
};

在这个例子中,比较 *** 作符首先通过调用get_detail()成员函数检索要比较的值②③,函数在索引值时被一个锁保护着


比较 *** 作符会在之后比较索引出来的值


注意:虽然这样能减少锁持有的时间,一个锁只持有一次(这样能消除死锁的可能性),这里有一个微妙的语义 *** 作同时对两个锁住的值进行比较。


列表3.10中,当 *** 作符返回true时,那就意味着在这个时间点上的lhs.some_detail与在另一个时间点的rhs.some_detail相同。


这两个值在读取之后,可能会被任意的方式所修改;两个值会在处进行交换,这样就会失去比较的意义。


等价比较可能会返回true,来表明这两个值时相等的,实际上这两个值相等的情况可能就发生在一瞬间。


这样的变化要小心,语义 *** 作是无法改变一个问题的比较方式:当你持有锁的时间没有达到整个 *** 作时间,就会让自己处于条件竞争的状态。


有时,只是没有一个合适粒度级别,因为并不是所有对数据结构的访问都需要同一级的保护。


这个例子中,就需要寻找一个合适的机制,去替换std::mutex


3.3 保护共享数据的替代设施

互斥量是最通用的机制,但其并非保护共享数据的唯一方式。


这里有很多替代方式可以在特定情况下,提供更加合适的保护。


一个特别极端(但十分常见)的情况就是,共享数据在并发访问和初始化时(都需要保护),但是之后需要进行隐式同步。


这可能是因为数据作为只读方式创建,所以没有同步问题;或者因为必要的保护作为对数据 *** 作的一部分,所以隐式的执行。


任何情况下,数据初始化后锁住一个互斥量,纯粹是为了保护其初始化过程(这是没有必要的),并且这会给性能带来不必要的冲击。


出于以上的原因,C++标准提供了一种纯粹保护共享数据初始化过程的机制。


3.3.1 保护共享数据的初始化过程

假设你与一个共享源,构建代价很昂贵,可能它会打开一个数据库连接或分配出很多的内存。


延迟初始化(Lazy initialization)在单线程代码很常见——每一个 *** 作都需要先对源进行检查,为了了解数据是否被初始化,然后在其使用前决定,数据是否需要初始化:

std::shared_ptr resource_ptr;
void foo()
{
  if(!resource_ptr)
  {
    resource_ptr.reset(new some_resource);  // 1
  }
  resource_ptr->do_something();
}

当共享数据对于并发访问是安全的,是转为多线程代码时,需要保护的,但是下面天真的转换会使得线程资源产生不必要的序列化。


这是因为每个线程必须等待互斥量,为了确定数据源已经初始化了。


清单 3.11 使用一个互斥量的延迟初始化(线程安全)过程

std::shared_ptr resource_ptr;
std::mutex resource_mutex;

void foo()
{
  std::unique_lock lk(resource_mutex);  // 所有线程在此序列化 
  if(!resource_ptr)
  {
    resource_ptr.reset(new some_resource);  // 只有初始化过程需要保护 
  }
  lk.unlock();
  resource_ptr->do_something();
}

这段代码相当常见了,也足够表现出没必要的线程化问题,很多人能想出更好的一些的办法来做这件事,包括声名狼藉的双重检查锁模式:

void undefined_behaviour_with_double_checked_locking()
{
  if(!resource_ptr)  // 1
  {
    std::lock_guard lk(resource_mutex);
    if(!resource_ptr)  // 2
    {
      resource_ptr.reset(new some_resource);  // 3
    }
  }
  resource_ptr->do_something();  // 4
}

指针第一次读取数据不需要获取锁,并且只有在指针为NULL时才需要获取锁。


然后,当获取锁之后,指针会被再次检查一遍 (这就是双重检查的部分),避免另一的线程在第一次检查后再做初始化,并且让当前线程获取锁。


这个模式为什么声名狼藉呢?因为这里有潜在的条件竞争,未被锁保护的读取 *** 作没有与其他线程里被锁保护的写入 *** 作进行同步。


因此就会产生条件竞争,这个条件竞争不仅覆盖指针本身,还会影响到其指向的对象;即使一个线程知道另一个线程完成对指针进行写入,它可能没有看到新创建的some_resource实例,然后调用do_something()后,得到不正确的结果。


这个例子是在一种典型的条件竞争——数据竞争,C++标准中这就会被指定为未定义行为


这种竞争肯定是可以避免的。


可以阅读第5章,那里有更多对内存模型的讨论,包括数据竞争的构成。


C++标准委员会也认为条件竞争的处理很重要,所以C++标准库提供了std::once_flagstd::call_once来处理这种情况。


比起锁住互斥量,并显式的检查指针,每个线程只需要使用std::call_once,在std::call_once的结束时,就能安全的知道指针已经被其他的线程初始化了。


使用std::call_once比显式使用互斥量消耗的资源更少,特别是当初始化完成后。


下面的例子展示了与清单3.11中的同样的 *** 作,这里使用了std::call_once


在这种情况下,初始化通过调用函数完成,同样这样 *** 作使用类中的函数 *** 作符来实现同样很简单。


如同大多数在标准库中的函数一样,或作为函数被调用,或作为参数被传递,std::call_once可以和任何函数或可调用对象一起使用。


std::shared_ptr resource_ptr;
std::once_flag resource_flag;  // 1

void init_resource()
{
  resource_ptr.reset(new some_resource);
}

void foo()
{
  std::call_once(resource_flag,init_resource);  // 可以完整的进行一次初始化
  resource_ptr->do_something();
}

在这个例子中,std::once_flag和初始化好的数据都是命名空间区域的对象,但是std::call_once()可仅作为延迟初始化的类型成员,如同下面的例子一样:

清单3.12 使用std::call_once作为类成员的延迟初始化(线程安全)

class X
{
private:
  connection_info connection_details;
  connection_handle connection;
  std::once_flag connection_init_flag;

  void open_connection()
  {
    connection=connection_manager.open(connection_details);
  }
public:
  X(connection_info const& connection_details_):
      connection_details(connection_details_)
  {}
  void send_data(data_packet const& data)  // 1
  {
    std::call_once(connection_init_flag,&X::open_connection,this);  // 2
    connection.send_data(data);
  }
  data_packet receive_data()  // 3
  {
    std::call_once(connection_init_flag,&X::open_connection,this);  // 2
    return connection.receive_data();
  }
};

例子中第一个调用send_data()receive_data()的线程完成初始化过程。


使用成员函数open_connection()去初始化数据,也需要将this指针传进去。


和其在在标准库中的函数一样,其接受可调用对象,比如std::thread的构造函数和std::bind(),通过向std::call_once()传递一个额外的参数来完成这个 *** 作。


值得注意的是,std::mutexstd::one_flag的实例就不能拷贝和移动,所以当你使用它们作为类成员函数,如果你需要用到他们,你就得显示定义这些特殊的成员函数。


还有一种情形的初始化过程中潜存着条件竞争:其中一个局部变量被声明为static类型。


这种变量的在声明后就已经完成初始化;对于多线程调用的函数,这就意味着这里有条件竞争——抢着去定义这个变量。


在很多在前C++11编译器(译者:不支持C++11标准的编译器),在实践过程中,这样的条件竞争是确实存在的,因为在多线程中,每个线程都认为他们是第一个初始化这个变量线程;或一个线程对变量进行初始化,而另外一个线程要使用这个变量时,初始化过程还没完成。


C++11标准中,这些问题都被解决了:初始化及定义完全在一个线程中发生,并且没有其他线程可在初始化完成前对其进行处理,条件竞争终止于初始化阶段,这样比在之后再去处理好的多。


在只需要一个全局实例情况下,这里提供一个std::call_once的替代方案

class my_class;
my_class& get_my_class_instance()
{
  static my_class instance;  // 线程安全的初始化过程
  return instance;
}

多线程可以安全的调用get_my_class_instance()函数,不用为数据竞争而担心。


对于很少有更新的数据结构来说,只在初始化时保护数据。


在大多数情况下,这种数据结构是只读的,并且多线程对其并发的读取也是很愉快的,不过一旦数据结构需要更新,就会产生竞争。


3.3.2 保护很少更新的数据结构

试想,为了将域名解析为其相关IP地址,我们在缓存中的存放了一张DNS入口表。


通常,给定DNS数目在很长的一段时间内保持不变。


虽然,在用户访问不同网站时,新的入口可能会被添加到表中,但是这些数据可能在其生命周期内保持不变。


所以定期检查缓存中入口的有效性,就变的十分重要了;但是,这也需要一次更新,也许这次更新只是对一些细节做了改动。


虽然更新频度很低,但更新也有可能发生,并且当这个可缓存被多个线程访问,这个缓存就需要处于更新状态时得到保护,这也为了确保每个线程读到都是有效数据。


没有使用专用数据结构时,这种方式是符合预期,并且为并发更新和读取特别设计的(更多的例子在第6和第7章中介绍)


这样的更新要求线程独占数据结构的访问权,直到其完成更新 *** 作。


当更新完成,数据结构对于并发多线程访问又会是安全的。


使用std::mutex来保护数据结构,显的有些反应过度(因为在没有发生修改时,它将削减并发读取数据的可能性)


这里需要另一种不同的互斥量,这种互斥量常被称为读者-作者锁,因为其允许两种不同的使用方式:一个作者线程独占访问和共享访问,让多个读者线程并发访问。


虽然这样互斥量的标准提案已经交给标准委员会,但是C++标准库依旧不会提供这样的互斥量[3]


因为建议没有被采纳,这个例子在本节中使用的是Boost库提供的实现(Boost采纳了这个建议)


你将在第8章中看到,这种锁的也不能包治百病,其性能依赖于参与其中的处理器数量,同样也与读者和作者线程的负载有关。


为了确保增加复杂度后还能获得性能收益,目标系统上的代码性能就很重要。


比起使用std::mutex实例进行同步,不如使用boost::shared_mutex来做同步。


对于更新 *** 作,可以使用std::lock_guardstd::unique_lock上锁。


作为std::mutex的替代方案,与std::mutex所做的一样,这就能保证更新线程的独占访问。


因为其他线程不需要去修改数据结构,所以其可以使用boost::shared_lock获取访问权。


这与使用std::unique_lock一样,除非多线程要在同时获取同一个boost::shared_mutex上有共享锁。


唯一的限制:当任一线程拥有一个共享锁时,这个线程就会尝试获取一个独占锁,直到其他线程放弃他们的锁;同样的,当任一线程拥有一个独占锁时,其他线程就无法获得共享锁或独占锁,直到第一个线程放弃其拥有的锁。


如同之前描述的那样,下面的代码清单展示了一个简单的DNS缓存,使用std::map持有缓存数据,使用boost::shared_mutex进行保护。


清单3.13 使用boost::shared_mutex对数据结构进行保护

#include 
#include 
#include 
#include 

class dns_entry;

class dns_cache
{
  std::map entries;
  mutable boost::shared_mutex entry_mutex;
public:
  dns_entry find_entry(std::string const& domain) const
  {
    boost::shared_lock lk(entry_mutex);  // 1
    std::map::const_iterator const it=
       entries.find(domain);
    return (it==entries.end())?dns_entry():it->second;
  }
  void update_or_add_entry(std::string const& domain,
                           dns_entry const& dns_details)
  {
    std::lock_guard lk(entry_mutex);  // 2
    entries[domain]=dns_details;
  }
};

清单3.13中,find_entry()使用boost::shared_lock<>来保护共享和只读权限;这就使得多线程可以同时调用find_entry(),且不会出错。


另一方面,update_or_add_entry()使用std::lock_guard<>实例,当表格需要更新时,为其提供独占访问权限;update_or_add_entry()函数调用时,独占锁会阻止其他线程对数据结构进行修改,并且阻止线程调用find_entry()


3.3.3 嵌套锁

当一个线程已经获取一个std::mutex(已经上锁),并对其再次上锁,这个 *** 作就是错误的,并且继续尝试这样做的话,就会产生未定义行为。


然而,在某些情况下,一个线程尝试获取同一个互斥量多次,而没有对其进行一次释放是可以的。


之所以可以,是因为C++标准库提供了std::recursive_mutex类。


其功能与std::mutex类似,除了你可以从同一线程的单个实例上获取多个锁。


互斥量锁住其他线程前,你必须释放你拥有的所有锁,所以当你调用lock()三次时,你也必须调用unlock()三次。


正确使用std::lock_guardstd::unique_lock可以帮你处理这些问题。


大多数情况下,当你需要嵌套锁时,就要对你的设计进行改动。


嵌套锁一般用在可并发访问的类上,所以其拥互斥量保护其成员数据。


每个公共成员函数都会对互斥量上锁,然后完成对应的功能,之后再解锁互斥量。


不过,有时成员函数会调用另一个成员函数,这种情况下,第二个成员函数也会试图锁住互斥量,这就会导致未定义行为的发生。


变通的解决方案会将互斥量转为嵌套锁,第二个成员函数就能成功的进行上锁,并且函数能继续执行。


但是,这样的使用方式是不推荐的,因为其过于草率,并且不合理。


特别是,当锁被持有时,对应类的不变量通常正在被修改。


这意味着,当不变量正在改变的时候,第二个成员函数还需要继续执行。


一个比较好的方式是,从中提取出一个函数作为类的私有成员,并且让其他成员函数都对其进行调用,这个私有成员函数不会对互斥量进行上锁(在调用前必须获得锁)


然后,你仔细考虑一下,在这种情况调用新函数时,数据的状态。


3.4 本章总结

本章讨论了当两个线程间的共享数据发生恶性条件竞争会带来多么严重的灾难,还讨论了如何使用std::mutex,和如何避免这些问题。


如你所见,互斥量并不是灵丹妙药,其还有自己的问题(比如:死锁),虽然C++标准库提供了一类工具来避免这些(例如:std::lock())


你还见识了一些用于避免死锁的先进技术,之后了解了锁所有权的转移,以及一些围绕如何选取适当粒度锁产生的问题。


最后,讨论了在具体情况下,数据保护的替代方案,例如:std::call_once()boost::shared_mutex


还有一个方面没有涉及到,那就是等待其他线程作为输入的情况。


我们的线程安全栈,仅是在栈为空时,抛出一个异常,所以当一个线程要等待其他线程向栈压入一个值时(这是一个线程安全栈的主要用途之一),它不得不多次尝试去d出一个值,当捕获抛出的异常时,再次进行尝试。


这种消耗资源的检查,没有任何意义。


并且,不断的检查会影响系统中其他线程的运行,这反而会妨碍程序的进展。


我们需要一些方法让一个线程等待其他线程完成任务,但在等待过程中不占用CPU


4章中,会去建立一些工具,用于保护共享数据,还会介绍一些线程同步 *** 作的机制;第6章中,如何构建更大型的可复用的数据类型。


第4章 同步并发 *** 作 4.1 等待一个事件或其他条件 4.2 限定等待事件 4.3 使用同步 *** 作简化代码 4.4 本章总结 第5章 C++内存模型和原子类型 *** 作 5.1 内存模型基础 5.2 C++中的原子 *** 作和原子模型 5.3 同步 *** 作和强制排序 5.4 本章总结 第6章 基于锁的并发数据结构设计 6.1 为并发设计的意义何在? 6.2 基于锁的并发数据结构 6.3 基于锁设计更加复杂的数据结构 6.4 本章总结 第7章 无锁并发数据结构设计 7.1 定义和意义 7.2 无锁数据结构的例子 7.3 对于设计无锁数据结构的指导 7.4 本章总结 第8章 并发代码设计 8.1 线程间划分工作的技术 8.2 如何让数据紧凑? 8.3 为多线程性能设计数据结构 8.4 设计并发代码的注意事项 8.5 在实践中设计并发代码 8.6 本章总结 第9章 高级线程管理 9.1 线程池 9.2 中断线程 9.3 本章总结 第10章 多线程程序的测试和调试 10.1 与并发相关的错误类型 10.2 定位并发错误的技术 10.3 本章总结

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/567415.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-09
下一篇 2022-04-09

发表评论

登录后才能评论

评论列表(0条)