首页 文章

推荐使用TPL用于非常长的生命线程

提问于
浏览
2

我已经阅读了一些关于任务并行库(http://msdn.microsoft.com/en-us/library/dd537609(v=vs.110).aspx)的MSDN文档,特别是关于TPL的最佳实践用法 .

我有一个启动线程的应用程序 . 该线程的目的是监视队列并“处理”已添加的项目 . 队列中项目的处理需要按顺序进行,因此我不希望从队列中一次处理多个项目 . 只要windows进程,该线程就会存在,并且仅在应用程序退出时关闭 .

我想知道使用TPL将此后台线程作为任务启动的优缺点 .

我最初的直觉是不使用TPL,因为我会在正在运行的应用程序的整个生命周期中占用一个线程池线程,这可能会阻止其他任务运行并干扰线程池的最佳运行 .

我不确定如何手动启动新的后台线程来执行工作,会影响应用程序其他独立部分中TPL的任何使用 .

我想知道在这种情况下,手动线程或任务是推荐的方法吗?或者我需要考虑哪些其他考虑因素才能做出明智的选择?

我已经包含了下面的代码 . 请注意,代码中的“处理器”通常执行CPU绑定操作,然后“处理程序”通常执行IO绑定操作:

public class TransactionProcessor : IDisposable
{
    private readonly IProcessorConfiguration configuration;
    private readonly IList<Tuple<string, IProcessor>> processors = new List<Tuple<string, IProcessor>>();
    private readonly IList<Tuple<string, IHandler>> handlers = new List<Tuple<string, IHandler>>(); 
    private AutoResetEvent waitForWork = new AutoResetEvent(true);
    private object lockObject = new object();
    private bool processThreadShouldExit = false;
    private Thread processorThread; 
    private Queue queue = new Queue();

    public TransactionProcessor(IProcessorConfiguration configuration)
    {
        if (configuration == null)
        {
            throw new ArgumentNullException("configuration");
        }

        this.configuration = configuration;
        this.Initialise();
    }

    public void Start()
    {
        lock (this.lockObject)
        {
            if (this.processorThread == null)
            {
                this.processThreadShouldExit = false;
                this.processorThread = new Thread(this.Dispatcher);
                this.processorThread.Start();
            }
        }
    }

    public void Stop()
    {
        if (this.processorThread != null)
        {
            this.processThreadShouldExit = true;
            this.waitForWork.Set();
            this.processorThread.Join();
            this.processorThread = null;
        }
    }   

    public void QueueTransactionForProcessing(Transaction Transaction, Guid clientID)
    {
        var queueObject = new QueueObject() { Transaction = Transaction };

        lock (this.lockObject)
        {
            this.queue.Enqueue(queueObject);
        }

        if (this.waitForWork != null)
        {
            this.waitForWork.Set();
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    protected virtual void Dispose(bool disposing)
    {
        if (this.processorThread != null)
        {
            this.Stop();
        }

        if (this.waitForWork != null)
        {
            this.waitForWork.Dispose();
            this.waitForWork = null;
        }
    }

    private void Dispatcher()
    {
        if (this.queue.Count == 0)
        {
            this.waitForWork.Reset();
        }

        while (!this.processThreadShouldExit)
        {
            if (this.queue.Count > 0 || this.waitForWork.WaitOne(60000))
            {
                while (this.queue.Count > 0 && !this.processThreadShouldExit)
                {
                    QueueObject queueObject;
                    lock (this.lockObject)
                    {
                        queueObject = (QueueObject)this.queue.Dequeue();
                    }

                    this.ProcessQueueItem(queueObject);
                }

                if (this.queue.Count == 0)
                {
                    this.waitForWork.Reset();
                }
            }
        }
    }   

    private void ProcessQueueItem(QueueObject item)
    {
        var correlationId = Guid.NewGuid();
        try
        {
            bool continuePipeline = true;
            foreach (var processor in this.processors)
            {
                processor.Item2.Process(item.Transaction, correlationId, ref continuePipeline);
                if (!continuePipeline)
                {
                    break;
                }
            }

            if (continuePipeline)
            {
                foreach (var handler in this.handlers)
                {
                    Transaction clonedTransaction = item.Transaction.Clone();
                    try
                    {
                        handler.Item2.Handle(clonedTransaction, correlationId);
                    }
                    catch (Exception e)
                    {
                    }
                }
            }
        }
        catch (Exception e)
        {               
        }
    }

    private void Initialise()
    {       
        foreach (var processor in this.configuration.Processors)
        {
            try
            {
                Type processorType = Type.GetType(processor.Value);
                if (processorType != null && typeof(IProcessor).IsAssignableFrom(processorType))
                {
                    var processorInstance = (IProcessor)Activator.CreateInstance(processorType);
                    this.processors.Add(new Tuple<string, IProcessor>(processor.Key, processorInstance));
                }
            catch (Exception e)
            {               
            }
        }

        foreach (var handler in this.configuration.Handlers)
        {
            try
            {
                Type handlerType = Type.GetType(handler.Value);
                if (handlerType != null && typeof(IHandler).IsAssignableFrom(handlerType))
                {
                    var handlerInstance = (IHandler)Activator.CreateInstance(handlerType);
                    this.handlers.Add(new Tuple<string, IHandler>(handler.Key, handlerInstance));
                }
            }
            catch (Exception e)
            {   
            }
        }
    }
}

1 回答

  • 3

    你应该改变很少需要使用低级构造,例如 Thread ,TPL也适用于长时间运行的任务,并且使用它可以产生更灵活和可维护的代码 .

    您可以使用以下命令启动长时间运行的任

    Task.Factory.StartNew(() => {}, TaskCreationOptions.LongRunning);
    

    来自MSDN doco:

    LongRunning指定任务将是一个长时间运行的粗粒度操作,涉及比细粒度系统更少,更大的组件 . 它向TaskScheduler提供了一个提示,即可以保证超额订阅 . Oversubscription允许您创建比可用硬件线程数更多的线程 .

    因此调度程序可以创建额外的线程以确保 ThreadPool 容量足够 . 此外,手动创建 Thread 不会影响您在代码的其他部分使用TPL .

    就个人而言,我肯定会选择TPL而非手动创建线程 . 有一点学习曲线,但它是一个非常强大的库,适合各种各样的场景 .

相关问题