当前位置:项目开发->项目经验 ->终极版本的生产者消费者demo

原创版权标志终极版本的生产者消费者demo

作者:阿郎  发表时间:2018/1/19 8:45:27  阅读:
[摘要] 总结了前面几篇文章的精华,修改了一些,最后整理出来了适合我自己的多生产者多消费者的流水线式的基于事件松耦合的demo

    这应该是最后的一篇关于生产者消费者的文章了,前面写了很多,有自己的实现的,也有参考别人的。总觉得直接用微软的BlockingCollection来做不太好,就从<<.Net中的并行编程-7.基于BlockingCollection实现高性能异步队列>><<.Net中的并行编程-4.实现高性能异步队列>>整理了适合自己的demo。首先给出基于阻塞队列的多生产者多消费者代码:

public class ProcessQueue<T>
 {
 private BlockingCollection<T> _queue;
 private CancellationTokenSource _cancellationTokenSource;
 private CancellationToken _cancellToken;
 //内部线程池
 private List<Thread> _threadCollection;

 //队列是否正在处理数据
 private int _isProcessing;
 //有线程正在处理数据
 private const int Processing = 1;
 //没有线程处理数据
 private const int UnProcessing = 0;
 //队列是否可用
 private volatile bool _enabled = true;
 //内部处理线程数量
 private int _internalThreadCount;
 // 消费者处理事件
 public event Action<T> ProcessItemEvent;
 //处理异常,需要三个参数,当前队列实例,异常,当时处理的数据
 public event Action<dynamic, Exception, T> ProcessExceptionEvent;

 public ProcessQueue()
 {
 _queue = new BlockingCollection<T>();
 _cancellationTokenSource = new CancellationTokenSource();
 _internalThreadCount = 3;
 _cancellToken = _cancellationTokenSource.Token;
 _threadCollection = new List<Thread>();
 }

 public ProcessQueue(int internalThreadCount) : this()
 {
 this._internalThreadCount = internalThreadCount;
 }

 /// <summary>
 /// 队列内部元素的数量 
 /// </summary>
 public int GetInternalItemCount()
 {
 //return _queue.Count;
 return _threadCollection.Count;
 }
 //生产者生产
 public void Enqueue(T items)
 {
 if (items == null)
 {
 throw new ArgumentException("items");
 }

 _queue.Add(items);
 DataAdded();
 }

 public void Flush()
 {
 StopProcess();

 while (_queue.Count != 0)
 {
 T item = default(T);
 if (_queue.TryTake(out item))
 {
 try
 {
 ProcessItemEvent(item);
 }
 catch (Exception ex)
 {
 OnProcessException(ex, item);
 }
 }
 }
 }
 // 通知消费者消费队列元素
 private void DataAdded()
 {
 if (_enabled)
 {
 if (!IsProcessingItem())
 {
 Console.WriteLine("DataAdded");
 ProcessRangeItem();
 StartProcess();
 }
 }
 }

 //判断是否队列有线程正在处理 
 private bool IsProcessingItem()
 {
 // 替换第一个参数, 如果相等
 //int x = Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing);
 return !(Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing) == UnProcessing);
 }
 // 多消费者消费
 private void ProcessRangeItem()
 {
 for (int i = 0; i < this._internalThreadCount; i++)
 {
 ProcessItem();
 }
 }
 // 开启消费处理
 private void ProcessItem()
 {
 Thread currentThread = new Thread((state) =>
 {
 T item = default(T);
 while (_enabled)
 {
 try
 {
 try
 {
 if (!_queue.TryTake(out item))
 {
 //Console.WriteLine("阻塞队列为0时的item: {0}", item);
 //Console.WriteLine("ok!!!");
 break;
 }
 // 处理事件
 ProcessItemEvent(item);
 }
 catch (OperationCanceledException ex)
 {
 DebugHelper.DebugView(ex.ToString());
 }

 }
 catch (Exception ex)
 {
 OnProcessException(ex, item);
 }
 }
 });
 _threadCollection.Add(currentThread);
 }
 // 开启消费者
 private void StartProcess()
 {
 //Console.WriteLine("线程的数量: {0}", _threadCollection.Count);
 foreach (var thread in _threadCollection)
 {
 thread.Start();
 thread.IsBackground = true;
 }
 }
 // 终止运行
 private void StopProcess()
 {
 this._enabled = false;
 foreach (var thread in _threadCollection)
 {
 if (thread.IsAlive)
 {
 thread.Join();
 }
 }
 _threadCollection.Clear();
 }

 private void OnProcessException(Exception ex, T item)
 {
 var tempException = ProcessExceptionEvent;
 Interlocked.CompareExchange(ref ProcessExceptionEvent, null, null);

 if (tempException != null)
 {
 ProcessExceptionEvent(this, ex, item);
 }
 }

 }
调用demo:

static void Main(string[] args)
 {
 ProcessQueue<int> processQueue = new ProcessQueue<int>();
 processQueue.ProcessExceptionEvent += ProcessQueue_ProcessExceptionEvent;
 processQueue.ProcessItemEvent += ProcessQueue_ProcessItemEvent;
 

 for (int i = 0; i < 50; i++)
 {
 processQueue.Enqueue(i);
 }

 Console.WriteLine("阻塞队列的数量: {0}", processQueue.GetInternalItemCount());

 processQueue.Flush();

 Console.Read();
 }

 /// <summary>
 /// 该方法对入队的每个元素进行处理
 /// </summary>
 /// <param name="value"></param>
 private static void ProcessQueue_ProcessItemEvent(int value)
 {
 Console.WriteLine("输出: {0}", value);
 }

 /// <summary>
 /// 处理异常
 /// </summary>
 /// <param name="obj">队列实例</param>
 /// <param name="ex">异常对象</param>
 /// <param name="value">出错的数据</param>
 private static void ProcessQueue_ProcessExceptionEvent(dynamic obj, Exception ex, int value)
 {
 Console.WriteLine(ex.ToString());
 }

    异步队列的多生产者多消费者就第二篇再给吧

文章来源:C++技术网原创文章版权为网站和作者共同所有,会员文章禁止转载。非会员文章转载做好本文超链接即表示授权转载。通过文章下面的分享按钮可以自由分享所有文章。

返回顶部

在线提问
问题标题:
问题描述:(简陋的描述会导致问题被最后回答、没有针对性回答甚至无法解答。请确保问题描述的足够清楚。)