首页 文章

与线程相关的主动对象设计问题(c boost)

提问于
浏览
1

我想要一些关于下面列出的IService类的反馈 . 据我所知,这种类与“活动对象”模式有关 . 如果我错误地使用任何相关术语,请原谅/更正 . 基本上这个想法是使用这个活动对象类的类需要提供一个控制某些事件循环的start和stop方法 . 此事件循环可以使用while循环或使用boost asio等实现 .

该类负责以非阻塞方式启动新线程,以便可以在新线程中处理事件 . 它还必须处理所有与清理相关的代码 . 我首先尝试了一种OO方法,其中子类负责覆盖控制事件循环的方法但是清理很乱:在析构函数中调用stop方法导致在调用类没有手动调用的情况下进行纯虚函数调用停止方法 . 模板化的解决方案似乎更清洁:

template <typename T>
class IService : private boost::noncopyable
{
    typedef boost::shared_ptr<boost::thread> thread_ptr;
public:

  IService()
  {
  }

  ~IService()
  {
    /// try stop the service in case it's running
    stop();
  }

  void start()
  {
    boost::mutex::scoped_lock lock(m_threadMutex);

    if (m_pServiceThread && m_pServiceThread->joinable())
    {
      // already running
      return;
    }

    m_pServiceThread = thread_ptr(new boost::thread(boost::bind(&IService::main, this)));

    // need to wait for thread to start: else if destructor is called before thread has started

    // Wait for condition to be signaled and then
    // try timed wait since the application could deadlock if the thread never starts?
    //if (m_startCondition.timed_wait(m_threadMutex, boost::posix_time::milliseconds(getServiceTimeoutMs())))
    //{
    //}
    m_startCondition.wait(m_threadMutex);

    // notify main to continue: it's blocked on the same condition var
    m_startCondition.notify_one();
  }

  void stop()
  {
    // trigger the stopping of the event loop
    m_serviceObject.stop();

    if (m_pServiceThread)
    {
      if (m_pServiceThread->joinable())
      {
        m_pServiceThread->join();
      }
      // the service is stopped so we can reset the thread
      m_pServiceThread.reset();
    }
  }

private:
  /// entry point of thread
  void main()
  {
    boost::mutex::scoped_lock lock(m_threadMutex);
    // notify main thread that it can continue
    m_startCondition.notify_one();

    // Try Dummy wait to allow 1st thread to resume???
    m_startCondition.wait(m_threadMutex);

    // call template implementation of event loop
    m_serviceObject.start();
  }

  /// Service thread
  thread_ptr m_pServiceThread;
  /// Thread mutex
  mutable boost::mutex m_threadMutex;
  /// Condition for signaling start of thread
  boost::condition m_startCondition;

  /// T must satisfy the implicit service interface and provide a start and a stop method
  T m_serviceObject;
};

该类可以使用如下:

class TestObject3
{
public:
  TestObject3()
      :m_work(m_ioService),
      m_timer(m_ioService, boost::posix_time::milliseconds(200))
  {
      m_timer.async_wait(boost::bind(&TestObject3::doWork, this, boost::asio::placeholders::error));
  }

  void start()
  {
      // simple event loop
      m_ioService.run();
  }

  void stop()
  {
      // signal end of event loop
      m_ioService.stop();
  }

  void doWork(const boost::system::error_code& e)
  {
      // Do some work here
      if (e != boost::asio::error::operation_aborted)
      {
      m_timer.expires_from_now( boost::posix_time::milliseconds(200) );
      m_timer.async_wait(boost::bind(&TestObject3::doWork, this, boost::asio::placeholders::error));
      }
  }

private:
  boost::asio::io_service m_ioService;
  boost::asio::io_service::work m_work;
  boost::asio::deadline_timer m_timer;
};

现在我的具体问题:

1)使用boost条件变量是否正确?对我来说这似乎有点像黑客:我想等待线程被启动所以我等待条件变量 . 然后,一旦在main方法中启动了新线程,我再次等待相同的条件变量以允许初始线程继续 . 然后,一旦退出初始线程的start方法,新线程就可以继续 . 这个可以吗?

2)是否存在操作系统无法成功启动线程的情况?我记得在某处读到这可能发生 . 如果这是可能的,我宁愿对条件变量进行定时等待(如在start方法中注释掉的那样)?

3)我知道模板化的类无法“正确”实现stop方法,即如果事件循环未能停止,代码将阻塞连接(在停止或析构函数中),但我看不出办法围绕这个 . 我想这是由类的用户确保启动和停止方法正确实现?

4)我会感谢任何其他设计错误,改进等?

谢谢!

1 回答

  • 0

    最后解决了以下问题:

    1)经过多次测试后,使用条件变量似乎很好

    2)这个问题没有出现(尚未)

    3)模板化的类实现必须满足要求,单元测试用于测试正确性

    4)改进

    • 添加了锁定加入

    • 在生成的线程中捕获异常并在主线程中重新抛出以避免崩溃并且不会丢失异常信息

    • 使用boost :: system :: error_code将错误代码传回给调用者

    • 实现对象是可设置的

    码:

    template <typename T>
    class IService : private boost::noncopyable
    {
      typedef boost::shared_ptr<boost::thread> thread_ptr;
      typedef T ServiceImpl;
    public:
      typedef boost::shared_ptr<IService<T> > ptr;
    
      IService()
        :m_pServiceObject(&m_serviceObject)
      {
      }
    
      ~IService()
      {
        /// try stop the service in case it's running
        if (m_pServiceThread && m_pServiceThread->joinable())
        {
          stop();
        }
      }
    
      static ptr create()
      {
        return boost::make_shared<IService<T> >();
      }
    
      /// Accessor to service implementation. The handle can be used to configure the implementation object
      ServiceImpl& get() { return m_serviceObject; }
      /// Mutator to service implementation. The handle can be used to configure the implementation object
      void set(ServiceImpl rServiceImpl)
      {
        // the implementation object cannot be modified once the thread has been created
        assert(m_pServiceThread == 0);
        m_serviceObject = rServiceImpl;
        m_pServiceObject = &m_serviceObject;
      }
    
      void set(ServiceImpl* pServiceImpl)
      {
        // the implementation object cannot be modified once the thread has been created
        assert(m_pServiceThread == 0);
    
        // make sure service object is valid
        if (pServiceImpl)
          m_pServiceObject = pServiceImpl; 
      }
    
      /// if the service implementation reports an error from the start or stop method call, it can be accessed via this method
      /// NB: only the last error can be accessed
      boost::system::error_code getServiceErrorCode() const { return m_ecService; }
    
      /// The join method allows the caller to block until thread completion
      void join()
      {
        // protect this method from being called twice (e.g. by user and by stop)
        boost::mutex::scoped_lock lock(m_joinMutex);
        if (m_pServiceThread && m_pServiceThread->joinable())
        {
          m_pServiceThread->join();
          m_pServiceThread.reset();
        }
      }
    
      /// This method launches the non-blocking service
      boost::system::error_code start()
      {
        boost::mutex::scoped_lock lock(m_threadMutex);
    
        if (m_pServiceThread && m_pServiceThread->joinable())
        {
          // already running
          return boost::system::error_code(SHARED_INVALID_STATE, shared_category);
        }
    
        m_pServiceThread = thread_ptr(new boost::thread(boost::bind(&IService2::main, this)));
        // Wait for condition to be signaled
        m_startCondition.wait(m_threadMutex);
    
        // notify main to continue: it's blocked on the same condition var
        m_startCondition.notify_one();
        // No error
        return boost::system::error_code();
      }
    
      /// This method stops the non-blocking service
      boost::system::error_code stop()
      {
        // trigger the stopping of the event loop
        //boost::system::error_code ec = m_serviceObject.stop();
        assert(m_pServiceObject);
        boost::system::error_code ec = m_pServiceObject->stop();
        if (ec)
        {
          m_ecService = ec;
          return ec;
        }
    
        // The service implementation can return an error code here for more information
        // However it is the responsibility of the implementation to stop the service event loop (if running)
        // Failure to do so, will result in a block
        // If this occurs in practice, we may consider a timed join?
        join();
    
        // If exception was thrown in new thread, rethrow it.
        // Should the template implementation class want to avoid this, it should catch the exception
        // in its start method and then return and error code instead
        if( m_exception )
          boost::rethrow_exception(m_exception);
    
        return ec;
      }
    
    private:
      /// runs in it's own thread
      void main()
      {
        try
        {
          boost::mutex::scoped_lock lock(m_threadMutex);
          // notify main thread that it can continue
          m_startCondition.notify_one();
          // Try Dummy wait to allow 1st thread to resume
          m_startCondition.wait(m_threadMutex);
    
          // call implementation of event loop
          // This will block
          // In scenarios where the service fails to start, the implementation can return an error code
          m_ecService = m_pServiceObject->start();
    
          m_exception = boost::exception_ptr();
        } 
        catch (...)
        {
          m_exception = boost::current_exception();
        }
      }
    
      /// Service thread
      thread_ptr m_pServiceThread;
      /// Thread mutex
      mutable boost::mutex m_threadMutex;
      /// Join mutex
      mutable boost::mutex m_joinMutex;
      /// Condition for signaling start of thread
      boost::condition m_startCondition;
    
      /// T must satisfy the implicit service interface and provide a start and a stop method
      T m_serviceObject;
      T* m_pServiceObject;
      // Error code for service implementation errors
      boost::system::error_code m_ecService;
    
      // Exception ptr to transport exception across different threads
      boost::exception_ptr m_exception;
    };
    

    当然,欢迎进一步的反馈/批评 .

相关问题