我有一个程序产生多个线程,每个线程执行一个长时间运行的任务 . 然后主线程等待所有工作线程加入,收集结果并退出 .
如果其中一个工作程序发生错误,我希望其余的工作程序正常停止,以便主线程可以在不久之后退出 .
我的问题是如何最好地执行此操作,当长期运行的任务的实现由我的代码无法修改的库提供时 .
这是系统的简单草图,没有错误处理:
void threadFunc()
{
// Do long-running stuff
}
void mainFunc()
{
std::vector<std::thread> threads;
for (int i = 0; i < 3; ++i) {
threads.push_back(std::thread(&threadFunc));
}
for (auto &t : threads) {
t.join();
}
}
如果长时间运行的函数执行循环并且我可以访问代码,那么只需通过检查每次迭代顶部的共享“keep on running”标志就可以中止执行 .
std::mutex mutex;
bool error;
void threadFunc()
{
try {
for (...) {
{
std::unique_lock<std::mutex> lock(mutex);
if (error) {
break;
}
}
}
} catch (std::exception &) {
std::unique_lock<std::mutex> lock(mutex);
error = true;
}
}
现在考虑一下库提供长时间运行的情况:
std::mutex mutex;
bool error;
class Task
{
public:
// Blocks until completion, error, or stop() is called
void run();
void stop();
};
void threadFunc(Task &task)
{
try {
task.run();
} catch (std::exception &) {
std::unique_lock<std::mutex> lock(mutex);
error = true;
}
}
在这种情况下,主线程必须处理错误,并在仍在运行的任务上调用 stop()
. 因此,它不能像原始实现那样简单地等待每个工作者 join()
.
到目前为止我使用的方法是在主线程和每个worker之间共享以下结构:
struct SharedData
{
std::mutex mutex;
std::condition_variable condVar;
bool error;
int running;
}
当 Worker 成功完成时,它会减少 running
计数 . 如果捕获到异常,则worker将设置 error
标志 . 在这两种情况下,它都会调用 condVar.notify_one()
.
然后主线程等待条件变量,如果设置 error
或 running
达到零,则唤醒 . 在唤醒时,如果设置了 error
,则主线程会在所有任务上调用 stop()
.
这种方法有效,但我觉得应该有一个更清晰的解决方案,使用标准并发库中的一些更高级的原语 . 有人可以建议改进实施吗?
以下是我当前解决方案的完整代码:
// main.cpp
#include <chrono>
#include <mutex>
#include <thread>
#include <vector>
#include "utils.h"
// Class which encapsulates long-running task, and provides a mechanism for aborting it
class Task
{
public:
Task(int tidx, bool fail)
: tidx(tidx)
, fail(fail)
, m_run(true)
{
}
void run()
{
static const int NUM_ITERATIONS = 10;
for (int iter = 0; iter < NUM_ITERATIONS; ++iter) {
{
std::unique_lock<std::mutex> lock(m_mutex);
if (!m_run) {
out() << "thread " << tidx << " aborting";
break;
}
}
out() << "thread " << tidx << " iter " << iter;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (fail) {
throw std::exception();
}
}
}
void stop()
{
std::unique_lock<std::mutex> lock(m_mutex);
m_run = false;
}
const int tidx;
const bool fail;
private:
std::mutex m_mutex;
bool m_run;
};
// Data shared between all threads
struct SharedData
{
std::mutex mutex;
std::condition_variable condVar;
bool error;
int running;
SharedData(int count)
: error(false)
, running(count)
{
}
};
void threadFunc(Task &task, SharedData &shared)
{
try {
out() << "thread " << task.tidx << " starting";
task.run(); // Blocks until task completes or is aborted by main thread
out() << "thread " << task.tidx << " ended";
} catch (std::exception &) {
out() << "thread " << task.tidx << " failed";
std::unique_lock<std::mutex> lock(shared.mutex);
shared.error = true;
}
{
std::unique_lock<std::mutex> lock(shared.mutex);
--shared.running;
}
shared.condVar.notify_one();
}
int main(int argc, char **argv)
{
static const int NUM_THREADS = 3;
std::vector<std::unique_ptr<Task>> tasks(NUM_THREADS);
std::vector<std::thread> threads(NUM_THREADS);
SharedData shared(NUM_THREADS);
for (int tidx = 0; tidx < NUM_THREADS; ++tidx) {
const bool fail = (tidx == 1);
tasks[tidx] = std::make_unique<Task>(tidx, fail);
threads[tidx] = std::thread(&threadFunc, std::ref(*tasks[tidx]), std::ref(shared));
}
{
std::unique_lock<std::mutex> lock(shared.mutex);
// Wake up when either all tasks have completed, or any one has failed
shared.condVar.wait(lock, [&shared](){
return shared.error || !shared.running;
});
if (shared.error) {
out() << "error occurred - terminating remaining tasks";
for (auto &t : tasks) {
t->stop();
}
}
}
for (int tidx = 0; tidx < NUM_THREADS; ++tidx) {
out() << "waiting for thread " << tidx << " to join";
threads[tidx].join();
out() << "thread " << tidx << " joined";
}
out() << "program complete";
return 0;
}
这里定义了一些实用程序函数:
// utils.h
#include <iostream>
#include <mutex>
#include <thread>
#ifndef UTILS_H
#define UTILS_H
#if __cplusplus <= 201103L
// Backport std::make_unique from C++14
#include <memory>
namespace std {
template<typename T, typename ...Args>
std::unique_ptr<T> make_unique(
Args&& ...args)
{
return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}
} // namespace std
#endif // __cplusplus <= 201103L
// Thread-safe wrapper around std::cout
class ThreadSafeStdOut
{
public:
ThreadSafeStdOut()
: m_lock(m_mutex)
{
}
~ThreadSafeStdOut()
{
std::cout << std::endl;
}
template <typename T>
ThreadSafeStdOut &operator<<(const T &obj)
{
std::cout << obj;
return *this;
}
private:
static std::mutex m_mutex;
std::unique_lock<std::mutex> m_lock;
};
std::mutex ThreadSafeStdOut::m_mutex;
// Convenience function for performing thread-safe output
ThreadSafeStdOut out()
{
return ThreadSafeStdOut();
}
#endif // UTILS_H
3 回答
您的问题是长时间运行的功能不是您的代码,并且您说您无法修改它 . 因此,除非库开发人员为您完成此操作,否则您无法对任何类型的外部同步原语(条件变量,信号量,互斥量,管道等)进行任何关注 .
因此,你唯一的选择就是做一些能够控制远离任何代码的东西,无论它是什么pthread_kill(),或者这些天的等价物 .
模式就是这样
检测到错误的线程需要以某种方式将该错误传回主线程 .
主线程然后需要为所有其他剩余线程调用pthread_kill() . 唐't be confused by the name - pthread_kill() is simply a way of delivering an arbitrary signal to a thread. Note that signals like STOP, CONTINUE and TERMINATE are process-wide even if raised with pthread_kill(), not thread specific so don'吨使用那些 .
在每个线程中,您都需要一个信号处理程序 . 在将信号传递给线程时,无论长时间运行的函数在做什么,该线程中的执行路径都将跳转到处理程序 .
你现在回到(有限的)控制中,可以(可能,好吧,也许)做一些有限的清理并终止线程 .
与此同时,主线程将在它发出信号的所有线程上调用pthread_join(),现在它们将返回 .
我的想法:
这是一种非常丑陋的做法(信号/ pthreads很难说得对,我真的看到你有什么其他的选择 .
在源代码中
它'll be a long way from looking '优雅',尽管最终用户体验会很好 .
您将通过运行该库函数中途执行中断,因此如果完成's any clean up it would normally do (e.g. freeing up memory it has allocated) that won' t并且您将发生内存泄漏 . 如果发生这种情况,在像valgrind这样的东西下运行是一种解决问题的方法 .
获取库函数进行清理的唯一方法(如果需要)将由您的信号处理程序将控制权返回给函数并让它运行完成,这就是您不想做的事情 .
当然,这不适用于Windows(没有pthreads,至少没有人值得一提,尽管可能有相同的机制) .
真正最好的方法是重新实现(如果可能的话)库函数 .
我一直在考虑你的情况,这可能对你有所帮助 . 您可以尝试使用几种不同的方法来实现目标 . 有2-3个选项可能使用或三者兼而有之 . 我将至少展示第一个选项,我仍在学习并尝试掌握模板专业化的概念以及使用Lambdas .
使用Manager类
使用模板专用化封装
使用Lambdas .
Manager类的伪代码看起来像这样:
仅出于演示目的,我使用bool值来确定线程是否因结构简单而失败,当然,如果您更喜欢使用异常或无效的无符号值等,则可以将其替换为您的类似等 .
现在要使用这种类了类似这样的事情:还要注意,如果它是Singleton类型的对象,那么这种类的类会被认为更好,因为你不需要超过1个ManagerClass,因为你正在使用共享指针 .
我喜欢管理器类的设计,因为我已经在其他项目中使用它们,并且它们经常派上用场,尤其是在使用包含许多资源的代码库时,例如具有许多资产的工作游戏引擎,如Sprites,纹理,音频文件, Map ,游戏项目等 . 使用管理器类有助于跟踪和维护所有资产 . 同样的概念可以应用于“管理”活动,非活动,等待线程,并且知道如何正确地直观地处理和关闭所有线程 . 如果您的代码库和库支持异常以及线程安全异常处理而不是传递和使用bool进行错误,我建议使用ExceptionHandler . 还有一个Logger类,它可以写入日志文件和/或控制台窗口,以显示抛出异常的函数以及导致日志消息可能如下所示的异常的显式消息:
通过这种方式,您可以查看日志文件并快速找出导致异常的线程,而不是使用传递的bool变量 .
The implementation of the long-running task is provided by a library whose code I cannot modify.
这意味着您无法同步工作线程完成的作业
If an error occurs in one of the workers,
我们假设您可以真正发现 Worker 错误;如果使用过的库报告其他人不能,则可以很容易地检测到其中一些
库代码循环 .
库代码过早地以未捕获的异常退出 .
I want the remaining workers to stop **gracefully**
那是不可能的
您可以做的最好的事情是编写一个线程管理器来检查工作线程状态,如果检测到错误条件,它只会(非正常地)“杀死”所有工作线程并退出 .
您还应该考虑检测循环工作线程(通过超时)并向用户提供kill或继续等待进程完成的选项 .