当前位置:开发工具->.NET ->Interlocked在多线程下进行原子操作无锁无阻塞的实现线程运行状态判断

原创版权标志Interlocked在多线程下进行原子操作无锁无阻塞的实现线程运行状态判断

作者:阿郎  发表时间:2018/1/28 0:08:46  阅读:
[摘要] 巧妙地使用Interlocked的各个方法,再无锁无阻塞的情况下判断出所有线程的运行完成状态。
使用支付宝扫码领红包,余额宝付款才可以使用红包哦!不要忘记哈。每天扫一次,天天赚红包!!可以将二维码保存到手机,每天直接扫码领红包啦!!
巧妙地使用Interlocked的各个方法,再无锁无阻塞的情况下判断出所有线程的运行完成状态。

昨晚耐着性子看完了clr via c#的第29章<<基元线程同步构造>>,尽管这本书不是第一次看了,但是之前看的都是一带而过,没有深入理解,甚至可以说是不理解,实习了之后发现自己的知识原来这么表面,很多的实现都不能做出来,这很大程度上打击了我,而且,春招也快来了,更需要打扎实基础。引起我注意的是jeffrey在第29章说的:使用Interlocked,代码很短,绝不阻塞任何线程,二期使用线程池线程来实现自动伸缩。下载了源码,然后分析了下书中的示例,code如下:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace vlr_via_cs
{
 internal static class AsyncCoordinatorDemo
 {
 public static void Go()
 {
 const Int32 timeout = 50000; // Change to desired timeout
 MultiWebRequests act = new MultiWebRequests(timeout);
 Console.WriteLine("All operations initiated (Timeout={0}). Hit <Enter> to cancel.",
 (timeout == Timeout.Infinite) ? "Infinite" : (timeout.ToString() + "ms"));
 Console.ReadLine();
 act.Cancel();

 Console.WriteLine();
 Console.WriteLine("Hit enter to terminate.");
 Console.ReadLine();
 }

 private sealed class MultiWebRequests
 {
 // This helper class coordinates all the asynchronous operations
 private AsyncCoordinator m_ac = new AsyncCoordinator();

 // Set of Web servers we want to query & their responses (Exception or Int32)
 private Dictionary<String, Object> m_servers = new Dictionary<String, Object> {
 { "http://cjjjs.com/", null },
 { "http://cnblogs.com/", null },
 { "http://www.jobbole.com/", null }
 };

 public MultiWebRequests(Int32 timeout = Timeout.Infinite)
 {
 // Asynchronously initiate all the requests all at once
 var httpClient = new HttpClient();
 foreach (var server in m_servers.Keys)
 {
 m_ac.AboutToBegin(1);
 httpClient.GetByteArrayAsync(server).ContinueWith(task => ComputeResult(server, task));
 }

 // Tell AsyncCoordinator that all operations have been initiated and to call
 // AllDone when all operations complete, Cancel is called, or the timeout occurs
 m_ac.AllBegun(AllDone, timeout);
 }

 private void ComputeResult(String server, Task<Byte[]> task)
 {
 Object result;
 if (task.Exception != null)
 {
 result = task.Exception.InnerException;
 }
 else
 {
 // Process I/O completion here on thread pool thread(s)
 // Put your own compute-intensive algorithm here...
 result = task.Result.Length; // This example just returns the length
 }

 // Save result (exception/sum) and indicate that 1 operation completed
 m_servers[server] = result;
 m_ac.JustEnded();
 }

 // Calling this method indicates that the results don't matter anymore
 public void Cancel() { m_ac.Cancel(); }

 // This method is called after all Web servers respond, 
 // Cancel is called, or the timeout occurs
 private void AllDone(CoordinationStatus status)
 {
 switch (status)
 {
 case CoordinationStatus.Cancel:
 Console.WriteLine("Operation canceled.");
 break;

 case CoordinationStatus.Timeout:
 Console.WriteLine("Operation timed-out.");
 break;

 case CoordinationStatus.AllDone:
 Console.WriteLine("Operation completed; results below:");
 foreach (var server in m_servers)
 {
 Console.Write("{0} ", server.Key);
 Object result = server.Value;
 if (result is Exception)
 {
 Console.WriteLine("failed due to {0}.", result.GetType().Name);
 }
 else
 {
 Console.WriteLine("returned {0:N0} bytes.", result);
 }
 }
 break;
 }
 }
 }

 private enum CoordinationStatus
 {
 AllDone,
 Timeout,
 Cancel
 };

 private sealed class AsyncCoordinator
 {
 private Int32 m_opCount = 1; // Decremented when AllBegun calls JustEnded
 private Int32 m_statusReported = 0; // 0=false, 1=true
 private Action<CoordinationStatus> m_callback;
 private Timer m_timer;

 // This method MUST be called BEFORE initiating an operation
 public void AboutToBegin(Int32 opsToAdd = 1)
 {
 Interlocked.Add(ref m_opCount, opsToAdd);
 }

 // This method MUST be called AFTER an operations result has been processed
 public void JustEnded()
 {
 if (Interlocked.Decrement(ref m_opCount) == 0)
 ReportStatus(CoordinationStatus.AllDone);
 }

 // This method MUST be called AFTER initiating ALL operations
 public void AllBegun(Action<CoordinationStatus> callback, Int32 timeout = Timeout.Infinite)
 {
 m_callback = callback;
 if (timeout != Timeout.Infinite)
 {
 // 在指定的时间点(dueTime) 调用回调函数,随后在指定的时间间隔(period)调用回调函数
 m_timer = new Timer(TimeExpired, null, timeout, Timeout.Infinite);
 }
 JustEnded();
 }

 // 处理过时的线程
 private void TimeExpired(Object o) {
 ReportStatus(CoordinationStatus.Timeout);
 }

 public void Cancel()
 {
 if (m_callback == null)
 throw new InvalidOperationException("Cancel cannot be called before AllBegun");
 ReportStatus(CoordinationStatus.Cancel);
 }

 private void ReportStatus(CoordinationStatus status)
 {
 if (m_timer != null)
 { // If timer is still in play, kill it
 Timer timer = Interlocked.Exchange(ref m_timer, null);
 if (timer != null) timer.Dispose();
 }

 // If status has never been reported, report it; else ignore it
 if (Interlocked.Exchange(ref m_statusReported, 1) == 0)
 m_callback(status);
 }
 }
 }


 class Program
 {
 static void Main(string[] args)
 {
 AsyncCoordinatorDemo.Go();

 Console.Read();
 }
 }
}

的确是无锁的操作,Interlocked方法是用户模式下的原子操作,针对的是CPU,不是线程内存,而且它是自旋等待的,耗费的是CPU资源。分析了下AsyncCoordinator类,主要就是利用Interlocked的Add方法,实时计数线程的数量,随后待一个线程运行的最后又调用Interlocked的Decrement方法自减。如果你留心的话,你会发现,目前绝大多数的并发判断中都用到了Interlocked的这些方法,尤其是interlocked的anything模式下的compareexchange方法,在这里提一嘴,除了compareexchange和exchange方法的返回值是返回ref类型原先的值之外,其余的方法都是返回改变之后的值。最后我们可以通过AllBegun方法来判断是不是所有的线程都执行完了,随后将状态变量m_statusReported设置为1,防止在进行状态判断。

这个类很好,之前写并发的时候,老是烦恼怎么判断并发是否已经完事了,又不想用到阻塞,这个类很好,当然应用到具体项目中可能还需要改,但是基本的模型还是这个,不变的。

有点感慨:好东西需要我们自己去发掘,之前查生产者消费者模型的时候,java代码一大堆,愣是没有看到几个C#,就算有也是简易,尽管可以把java的改变为C#的,但有点感慨C#的技术栈和资源少
微信扫码关注公众号CPP技术网,微信号cpp_coder,关注我们的公众号,阅读更多精彩内容!每天还可以领取大红包哦!!!每天还可以领取大红包哦!!!每天还可以领取大红包哦!!!
文章来源:C++技术网原创文章版权为网站和作者共同所有,会员文章禁止转载。非会员文章转载做好本文超链接即表示授权转载。通过文章下面的分享按钮可以自由分享所有文章。

返回顶部

在线提问
问题标题:
问题描述:(简陋的描述会导致问题被最后回答、没有针对性回答甚至无法解答。请确保问题描述的足够清楚。)