- 前言
- 一、基于锁的线程安全栈
- 二、基于锁和条件变量的线程安全队列
- 总结
前言
前文介绍了 C++ 标准库中的并发库,现在让我们了解一下基于锁的并发数据结构。
并发安全的数据结构,其 *** 作要么是不用改变任何元素,比如读 *** 作,要么是改变元素时只让涉及改变的函数 *** 作数据,其它的则通过锁进行阻塞。
并发数据结构的设计有两个要点。
其一,是要安全,为了安全,需要通过锁等设施干预程序运行逻辑,使得某些 *** 作必须在一定条件下单线程运行。
其二,是效率,我们设计多线程并发代码的初衷,是榨取多核 CPU 的所有计算潜力,其基本逻辑是乱序的,但为了安全正确,需要加锁改变逻辑,当设计不合理时,我们就会创造一个由多核心运行的单路程序,还不如直接用单线程来的有效率。
所以为了平衡二者,需要精巧的设计。
一、基于锁的线程安全栈
以下是基于锁和标准库 std::stack 封装的线程安全栈,根据代码可见,所有 *** 作都是通过锁,全局锁定,安全是可以保证的,但效率是堪忧的。
#include
#include
#include
#include
#include
struct emptyStack : std::exception
{
[[nodiscard]] auto what() const noexcept -> const char * override
{
return "Stack is empty!";
}
};
template <typename T>
class threadSafeStack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadSafeStack() = default;
threadSafeStack(const threadSafeStack &other)
{
std::lock_guard<std::mutex> lock(other.m);
data = other.data;
}
auto operator=(const threadSafeStack &) -> threadSafeStack & = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}
auto pop() -> std::shared_ptr<T>
{
std::lock_guard<std::mutex> lock(m);
if (data.empty())
{
throw emptyStack();
}
const std::shared_ptr<T> res(
std::make_shared<T>(std::move(data.top())));
data.pop();
return res;
}
void pop(T &value)
{
std::lock_guard<std::mutex> lock(m);
if (data.empty())
{
throw emptyStack();
}
value = std::move(data.top());
data.pop();
}
auto empty() const -> bool
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
auto main() -> int
{
threadSafeStack<int> tss;
tss.push(5);
int val = 0;
tss.pop(val);
// tss.push(5);
auto ptrval = tss.pop();
return 0;
}
二、基于锁和条件变量的线程安全队列
以下示例,队列由两种实现组成,tryPop 和 waitAndPop ,每种实现又有两种方式,传入引用参数的,返回 bool 值,告知是否值已经传递,无引用参数传递版本,则返回只能指针,如指针不空,则传递了值。
但 waitAndPop 的逻辑必须等待返回值,所以就算传入引用参数也无需返回 bool 值告知是否值已经返回,因为它必须返回,否则一直阻塞。
此种设计,兼顾了一定的安全以及并发效率。
#include
#include
#include
#include
template <typename T>
struct threadSafeQueue
{
threadSafeQueue() = default;
void push(T newValue)
{
std::lock_guard<std::mutex> lk(mut);
dataQueue.push(std::move(newValue));
dataCond.notify_one();
}
void waitAndPop(T &value)
{
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk, [this] { return !dataQueue.empty(); });
value = std::move(dataQueue.front());
dataQueue.pop();
}
auto waitAndPop() -> std::shared_ptr<T>
{
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk, [this] { return !dataQueue.empty(); });
std::shared_ptr<T> res(
std::make_shared<T>(std::move(dataQueue.front())));
dataQueue.pop();
return res;
}
auto tryPop(T &value) -> bool
{
std::lock_guard<std::mutex> lk(mut);
if (dataQueue.empty())
{
return false;
}
value = std::move(dataQueue.front());
dataQueue.pop();
return true;
}
auto tryPop() -> std::shared_ptr<T>
{
std::lock_guard<std::mutex> lk(mut);
if (dataQueue.empty())
{
return std::shared_ptr<T>();
}
std::shared_ptr<T> res(
std::make_shared<T>(std::move(dataQueue.front())));
dataQueue.pop();
return res;
}
auto empty() const -> bool
{
std::lock_guard<std::mutex> lk(mut);
return dataQueue.empty();
}
private:
mutable std::mutex mut;
std::queue<T> dataQueue;
std::condition_variable dataCond;
};
auto main() -> int
{
threadSafeQueue<int> tsq;
tsq.push(3);
auto val = tsq.tryPop();
tsq.push(3);
int valInt = 0;
bool rst = tsq.tryPop(valInt);
tsq.push(3);
val = tsq.waitAndPop();
tsq.push(3);
tsq.waitAndPop(valInt);
return 0;
}
并发设计永无止境,上面代码看似已经很好,但还是有异常安全问题。
下面是模拟如果构造 std::make_shared() 出现异常时的解决思路,防止因局部异常导致整体阻塞,无法运行。
auto waitAndPop() -> std::shared_ptr<T>
{
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk, [this] { return !dataQueue.empty(); });
try
{
std::shared_ptr<T> res(
std::make_shared<T>(std::move(dataQueue.front())));
if (*res == 5)
{
throw "exception";
}
dataQueue.pop();
return res;
}
catch (...)
{
dataCond.notify_one();
// dataQueue.pop();
}
}
还有一种,是将存储值全部用智能指针 std::shared_ptr
#include
#include
#include
template <typename T>
struct threadSafeQueue
{
threadSafeQueue() = default;
void push(T newValue)
{
std::shared_ptr<T> const data(std::make_shared<T>(std::move(newValue)));
std::lock_guard<std::mutex> const lk(mut);
dataQueue.push(data);
dataCond.notify_one();
}
void waitAndPop(T &value)
{
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk, [this] { return !dataQueue.empty(); });
value = std::move(*dataQueue.front());
dataQueue.pop();
}
auto waitAndPop() -> std::shared_ptr<T>
{
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk, [this] { return !dataQueue.empty(); });
std::shared_ptr<T> res = dataQueue.front();
dataQueue.pop();
return res;
}
auto tryPop(T &value) -> bool
{
std::lock_guard<std::mutex> const lk(mut);
if (dataQueue.empty())
{
return false;
}
value = std::move(*dataQueue.front());
dataQueue.pop();
return true;
}
auto tryPop() -> std::shared_ptr<T>
{
std::lock_guard<std::mutex> const lk(mut);
if (dataQueue.empty())
{
return std::shared_ptr<T>();
}
std::shared_ptr<T> res = dataQueue.front();
dataQueue.pop();
return res;
}
auto empty() const -> bool
{
std::lock_guard<std::mutex> lk(mut);
return dataQueue.empty();
}
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> dataQueue;
std::condition_variable dataCond;
};
auto main() -> int
{
threadSafeQueue<int> tsq;
tsq.push(3);
auto val = tsq.tryPop();
tsq.push(3);
int valInt = 0;
bool const rst = tsq.tryPop(valInt);
tsq.push(3);
val = tsq.waitAndPop();
tsq.push(3);
tsq.waitAndPop(valInt);
return 0;
}
总结
适应并发编程的数据结构设计,需要考虑的事情非常之多,往往考验的是安全和效率的平衡拿捏,让可以乱序执行的乱序执行,不可以乱序执行的顺序执行,看似简单,实则不出纰漏的实现,非常之难。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)