2022-09-26 C++并发编程(二十六)

2022-09-26 C++并发编程(二十六),第1张

2022-09-26 C++并发编程(二十六)
  • 前言
  • 一、基于锁的线程安全栈
  • 二、基于锁和条件变量的线程安全队列
  • 总结


前言

前文介绍了 C++ 标准库中的并发库,现在让我们了解一下基于锁的并发数据结构。

并发安全的数据结构,其 *** 作要么是不用改变任何元素,比如读 *** 作,要么是改变元素时只让涉及改变的函数 *** 作数据,其它的则通过锁进行阻塞。

并发数据结构的设计有两个要点。

其一,是要安全,为了安全,需要通过锁等设施干预程序运行逻辑,使得某些 *** 作必须在一定条件下单线程运行。

其二,是效率,我们设计多线程并发代码的初衷,是榨取多核 CPU 的所有计算潜力,其基本逻辑是乱序的,但为了安全正确,需要加锁改变逻辑,当设计不合理时,我们就会创造一个由多核心运行的单路程序,还不如直接用单线程来的有效率。

所以为了平衡二者,需要精巧的设计。


一、基于锁的线程安全栈

以下是基于锁和标准库 std::stack 封装的线程安全栈,根据代码可见,所有 *** 作都是通过锁,全局锁定,安全是可以保证的,但效率是堪忧的。

#include 
#include 
#include 
#include 
#include 

struct emptyStack : std::exception
{
    [[nodiscard]] auto what() const noexcept -> const char * override
    {
        return "Stack is empty!";
    }
};

template <typename T>
class threadSafeStack
{
  private:
    std::stack<T> data;
    mutable std::mutex m;

  public:
    threadSafeStack() = default;

    threadSafeStack(const threadSafeStack &other)
    {
        std::lock_guard<std::mutex> lock(other.m);
        data = other.data;
    }

    auto operator=(const threadSafeStack &) -> threadSafeStack & = delete;

    void push(T new_value)
    {
        std::lock_guard<std::mutex> lock(m);
        data.push(std::move(new_value));
    }

    auto pop() -> std::shared_ptr<T>
    {
        std::lock_guard<std::mutex> lock(m);
        if (data.empty())
        {
            throw emptyStack();
        }
        const std::shared_ptr<T> res(
            std::make_shared<T>(std::move(data.top())));
        data.pop();
        return res;
    }

    void pop(T &value)
    {
        std::lock_guard<std::mutex> lock(m);
        if (data.empty())
        {
            throw emptyStack();
        }
        value = std::move(data.top());
        data.pop();
    }

    auto empty() const -> bool
    {
        std::lock_guard<std::mutex> lock(m);
        return data.empty();
    }
};

auto main() -> int
{
    threadSafeStack<int> tss;
    tss.push(5);
    int val = 0;
    tss.pop(val);
 //   tss.push(5);
    auto ptrval = tss.pop();
    return 0;
}
二、基于锁和条件变量的线程安全队列

以下示例,队列由两种实现组成,tryPop 和 waitAndPop ,每种实现又有两种方式,传入引用参数的,返回 bool 值,告知是否值已经传递,无引用参数传递版本,则返回只能指针,如指针不空,则传递了值。

但 waitAndPop 的逻辑必须等待返回值,所以就算传入引用参数也无需返回 bool 值告知是否值已经返回,因为它必须返回,否则一直阻塞。

此种设计,兼顾了一定的安全以及并发效率。

#include 
#include 
#include 
#include 

template <typename T>
struct threadSafeQueue
{
    threadSafeQueue() = default;

    void push(T newValue)
    {
        std::lock_guard<std::mutex> lk(mut);
        dataQueue.push(std::move(newValue));
        dataCond.notify_one();
    }

    void waitAndPop(T &value)
    {
        std::unique_lock<std::mutex> lk(mut);
        dataCond.wait(lk, [this] { return !dataQueue.empty(); });
        value = std::move(dataQueue.front());
        dataQueue.pop();
    }

    auto waitAndPop() -> std::shared_ptr<T>
    {
        std::unique_lock<std::mutex> lk(mut);
        dataCond.wait(lk, [this] { return !dataQueue.empty(); });
        std::shared_ptr<T> res(
            std::make_shared<T>(std::move(dataQueue.front())));
        dataQueue.pop();
        return res;
    }

    auto tryPop(T &value) -> bool
    {
        std::lock_guard<std::mutex> lk(mut);
        if (dataQueue.empty())
        {
            return false;
        }
        value = std::move(dataQueue.front());
        dataQueue.pop();
        return true;
    }

    auto tryPop() -> std::shared_ptr<T>
    {
        std::lock_guard<std::mutex> lk(mut);
        if (dataQueue.empty())
        {
            return std::shared_ptr<T>();
        }
        std::shared_ptr<T> res(
            std::make_shared<T>(std::move(dataQueue.front())));
        dataQueue.pop();
        return res;
    }

    auto empty() const -> bool
    {
        std::lock_guard<std::mutex> lk(mut);
        return dataQueue.empty();
    }

  private:
    mutable std::mutex mut;
    std::queue<T> dataQueue;
    std::condition_variable dataCond;
};

auto main() -> int
{
    threadSafeQueue<int> tsq;

    tsq.push(3);
    auto val = tsq.tryPop();

    tsq.push(3);
    int valInt = 0;
    bool rst = tsq.tryPop(valInt);

    tsq.push(3);
    val = tsq.waitAndPop();

    tsq.push(3);
    tsq.waitAndPop(valInt);

    return 0;
}

并发设计永无止境,上面代码看似已经很好,但还是有异常安全问题。

下面是模拟如果构造 std::make_shared() 出现异常时的解决思路,防止因局部异常导致整体阻塞,无法运行。

    auto waitAndPop() -> std::shared_ptr<T>
    {
        std::unique_lock<std::mutex> lk(mut);
        dataCond.wait(lk, [this] { return !dataQueue.empty(); });
        try
        {
            std::shared_ptr<T> res(
                std::make_shared<T>(std::move(dataQueue.front())));

            if (*res == 5)
            {
                throw "exception";
            }
            dataQueue.pop();
            return res;
        }
        catch (...)
        {
            dataCond.notify_one();
            // dataQueue.pop();
        }
    }

还有一种,是将存储值全部用智能指针 std::shared_ptr 替代,在 push 时进行智能指针的构造。

#include 
#include 
#include 

template <typename T>
struct threadSafeQueue
{
    threadSafeQueue() = default;

    void push(T newValue)
    {
        std::shared_ptr<T> const data(std::make_shared<T>(std::move(newValue)));
        std::lock_guard<std::mutex> const lk(mut);
        dataQueue.push(data);
        dataCond.notify_one();
    }

    void waitAndPop(T &value)
    {
        std::unique_lock<std::mutex> lk(mut);
        dataCond.wait(lk, [this] { return !dataQueue.empty(); });
        value = std::move(*dataQueue.front());
        dataQueue.pop();
    }

    auto waitAndPop() -> std::shared_ptr<T>
    {
        std::unique_lock<std::mutex> lk(mut);
        dataCond.wait(lk, [this] { return !dataQueue.empty(); });
        std::shared_ptr<T> res = dataQueue.front();
        dataQueue.pop();
        return res;
    }

    auto tryPop(T &value) -> bool
    {
        std::lock_guard<std::mutex> const lk(mut);
        if (dataQueue.empty())
        {
            return false;
        }
        value = std::move(*dataQueue.front());
        dataQueue.pop();
        return true;
    }

    auto tryPop() -> std::shared_ptr<T>
    {
        std::lock_guard<std::mutex> const lk(mut);
        if (dataQueue.empty())
        {
            return std::shared_ptr<T>();
        }
        std::shared_ptr<T> res = dataQueue.front();
        dataQueue.pop();
        return res;
    }

    auto empty() const -> bool
    {
        std::lock_guard<std::mutex> lk(mut);
        return dataQueue.empty();
    }

  private:
    mutable std::mutex mut;
    std::queue<std::shared_ptr<T>> dataQueue;
    std::condition_variable dataCond;
};

auto main() -> int
{
    threadSafeQueue<int> tsq;

    tsq.push(3);
    auto val = tsq.tryPop();

    tsq.push(3);
    int valInt = 0;
    bool const rst = tsq.tryPop(valInt);

    tsq.push(3);
    val = tsq.waitAndPop();

    tsq.push(3);
    tsq.waitAndPop(valInt);

    return 0;
}

总结

适应并发编程的数据结构设计,需要考虑的事情非常之多,往往考验的是安全和效率的平衡拿捏,让可以乱序执行的乱序执行,不可以乱序执行的顺序执行,看似简单,实则不出纰漏的实现,非常之难。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/3002156.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-09-27
下一篇 2022-09-27

发表评论

登录后才能评论

评论列表(0条)

保存