校园网站建设可行性分析,wordpress 重新生成缩略图,深圳建网站公司,网站自适应与响应式本文通过对.NET4.5的ThreadPool源码的分析讲解揭示.NET线程池的内幕#xff0c;并总结ThreadPool设计的好与不足。 线程池的作用线程池#xff0c;顾名思义#xff0c;线程对象池。Task和TPL都有用到线程池#xff0c;所以了解线程池的内幕有助于你写出更好的程序。由于篇幅… 本文通过对.NET4.5的ThreadPool源码的分析讲解揭示.NET线程池的内幕并总结ThreadPool设计的好与不足。 线程池的作用线程池顾名思义线程对象池。Task和TPL都有用到线程池所以了解线程池的内幕有助于你写出更好的程序。由于篇幅有限在这里我只讲解以下核心概念 线程池的大小如何调用线程池添加任务线程池如何执行任务 Threadpool也支持操控IOCP的线程但在这里我们不研究它涉及到task和TPL的会在其各自的博客中做详解。线程池的大小不管什么池总有尺寸ThreadPool也不例外。ThreadPool提供了4个方法来调整线程池的大小 SetMaxThreadsGetMaxThreadsSetMinThreadsGetMinThreads SetMaxThreads指定线程池最多可以有多少个线程而GetMaxThreads自然就是获取这个值。SetMinThreads指定线程池中最少存活的线程的数量而GetMinThreads就是获取这个值。为何要设置一个最大数量和有一个最小数量呢原来线程池的大小取决于若干因素如虚拟地址空间的大小等。比如你的计算机是4g内存而一个线程的初始堆栈大小为1m那么你最多能创建4g/1m的线程忽略操作系统本身以及其他进程内存分配正因为线程有内存开销所以如果线程池的线程过多而又没有被完全使用那么这就是对内存的一种浪费所以限制线程池的最大数是很make sense的。那么最小数又是为啥线程池就是线程的对象池对象池的最大的用处是重用对象。为啥要重用线程因为线程的创建与销毁都要占用大量的cpu时间。所以在高并发状态下线程池由于无需创建销毁线程节约了大量时间提高了系统的响应能力和吞吐量。最小数可以让你调整最小的存活线程数量来应对不同的高并发场景。如何调用线程池添加任务线程池主要提供了2个方法来调用QueueUserWorkItem和UnsafeQueueUserWorkItem。两个方法的代码基本一致除了attribute不同QueueUserWorkItem可以被partial trust的代码调用而UnsafeQueueUserWorkItem只能被full trust的代码调用。 public static bool QueueUserWorkItem(WaitCallback callBack) { StackCrawlMark stackMark StackCrawlMark.LookForMyCaller; return ThreadPool.QueueUserWorkItemHelper(callBack, (object) null, ref stackMark, true); } QueueUserWorkItemHelper首先调用ThreadPool.EnsureVMInitialized()来确保CLR虚拟机初始化VM是一个统称不是单指java虚拟机也可以指CLR的execution engine紧接着实例化ThreadPoolWorkQueue最后调用ThreadPoolWorkQueue的Enqueue方法并传入callback和true。 [SecurityCritical] public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal) { ThreadPoolWorkQueueThreadLocals queueThreadLocals (ThreadPoolWorkQueueThreadLocals) null; if (!forceGlobal) queueThreadLocals ThreadPoolWorkQueueThreadLocals.threadLocals; if (this.loggingEnabled) FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject((object) callback); if (queueThreadLocals ! null) { queueThreadLocals.workStealingQueue.LocalPush(callback); } else { ThreadPoolWorkQueue.QueueSegment comparand this.queueHead; while (!comparand.TryEnqueue(callback)) { Interlocked.CompareExchangeThreadPoolWorkQueue.QueueSegment(ref comparand.Next, new ThreadPoolWorkQueue.QueueSegment(), (ThreadPoolWorkQueue.QueueSegment) null); for (; comparand.Next ! null; comparand this.queueHead) Interlocked.CompareExchangeThreadPoolWorkQueue.QueueSegment(ref this.queueHead, comparand.Next, comparand); } } this.EnsureThreadRequested(); } ThreadPoolWorkQueue主要包含2个“queue”(实际是数组)一个为QueueSegmentglobal work queue另一个是WorkStealingQueue(local work queue)。两者具体的区别会在Task/TPL里讲解这里暂不解释。由于forceGlobal是true所以执行到了comparand.TryEnqueue(callback)也就是QueueSegment.TryEnqueue。comparand先从队列的头(queueHead)开始enqueue如果不行就继续往下enqueue成功后再赋值给queueHead。让我们来看看QueueSegment的源代码 public QueueSegment() { this.nodes new IThreadPoolWorkItem[256]; } public bool TryEnqueue(IThreadPoolWorkItem node) { int upper; int lower; this.GetIndexes(out upper, out lower); while (upper ! this.nodes.Length) { if (this.CompareExchangeIndexes(ref upper, upper 1, ref lower, lower)) { Volatile.WriteIThreadPoolWorkItem(ref this.nodes[upper], node); return true; } } return false; } 这个所谓的global work queue实际上是一个IThreadPoolWorkItem的数组而且限死256这是为啥难道是因为和IIS线程池(也只有256个线程对齐使用interlock和内存写屏障volatile.write来保证nodes的正确性比起同步锁性能有很大的提高。最后调用EnsureThreadRequestedEnsureThreadRequested会调用QCall把请求发送至CLR由CLR调度ThreadPool。线程池如何执行任务线程被调度后通过ThreadPoolWorkQueue的Dispatch方法来执行callback。 internal static bool Dispatch() { ThreadPoolWorkQueue threadPoolWorkQueue ThreadPoolGlobals.workQueue; int tickCount Environment.TickCount; threadPoolWorkQueue.MarkThreadRequestSatisfied(); threadPoolWorkQueue.loggingEnabled FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, (EventKeywords) 18); bool flag1 true; IThreadPoolWorkItem callback (IThreadPoolWorkItem) null; try { ThreadPoolWorkQueueThreadLocals tl threadPoolWorkQueue.EnsureCurrentThreadHasQueue(); while ((long) (Environment.TickCount - tickCount) (long) ThreadPoolGlobals.tpQuantum) { try { } finally { bool missedSteal false; threadPoolWorkQueue.Dequeue(tl, out callback, out missedSteal); if (callback null) flag1 missedSteal; else threadPoolWorkQueue.EnsureThreadRequested(); } if (callback null) return true; if (threadPoolWorkQueue.loggingEnabled) FrameworkEventSource.Log.ThreadPoolDequeueWorkObject((object) callback); if (ThreadPoolGlobals.enableWorkerTracking) { bool flag2 false; try { try { } finally { ThreadPool.ReportThreadStatus(true); flag2 true; } callback.ExecuteWorkItem(); callback (IThreadPoolWorkItem) null; } finally { if (flag2) ThreadPool.ReportThreadStatus(false); } } else { callback.ExecuteWorkItem(); callback (IThreadPoolWorkItem) null; } if (!ThreadPool.NotifyWorkItemComplete()) return false; } return true; } catch (ThreadAbortException ex) { if (callback ! null) callback.MarkAborted(ex); flag1 false; } finally { if (flag1) threadPoolWorkQueue.EnsureThreadRequested(); } return true; } while语句判断如果执行时间少于30ms会不断继续执行下一个callback。这是因为大多数机器线程切换大概在30ms如果该线程只执行了不到30ms就在等待中断线程切换那就太浪费CPU了浪费可耻啊Dequeue负责找到需要执行的callback public void Dequeue(ThreadPoolWorkQueueThreadLocals tl, out IThreadPoolWorkItem callback, out bool missedSteal) { callback (IThreadPoolWorkItem) null; missedSteal false; ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue1 tl.workStealingQueue; workStealingQueue1.LocalPop(out callback); if (callback null) { for (ThreadPoolWorkQueue.QueueSegment comparand this.queueTail; !comparand.TryDequeue(out callback) comparand.Next ! null comparand.IsUsedUp(); comparand this.queueTail) Interlocked.CompareExchangeThreadPoolWorkQueue.QueueSegment(ref this.queueTail, comparand.Next, comparand); } if (callback ! null) return; ThreadPoolWorkQueue.WorkStealingQueue[] current ThreadPoolWorkQueue.allThreadQueues.Current; int num tl.random.Next(current.Length); for (int length current.Length; length 0; --length) { ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue2 Volatile.ReadThreadPoolWorkQueue.WorkStealingQueue(ref current[num % current.Length]); if (workStealingQueue2 ! null workStealingQueue2 ! workStealingQueue1 workStealingQueue2.TrySteal(out callback, ref missedSteal)) break; num; } } 因为我们把callback添加到了global work queue所以local work queue(workStealingQueue.LocalPop(out callback))找不到callbacklocal work queue查找callback会在task里讲解。接着又去global work queue查找先从global work queue的起始位置查找直至尾部因此global work quque里的callback是FIFO的执行顺序。 public bool TryDequeue(out IThreadPoolWorkItem node) { int upper; int lower; this.GetIndexes(out upper, out lower); while (lower ! upper) { // ISSUE: explicit reference operation // ISSUE: variable of a reference type int prevUpper upper; // ISSUE: explicit reference operation int newUpper ^prevUpper; // ISSUE: explicit reference operation // ISSUE: variable of a reference type int prevLower lower; // ISSUE: explicit reference operation int newLower ^prevLower 1; if (this.CompareExchangeIndexes(prevUpper, newUpper, prevLower, newLower)) { SpinWait spinWait new SpinWait(); while ((node Volatile.ReadIThreadPoolWorkItem(ref this.nodes[lower])) null) spinWait.SpinOnce(); this.nodes[lower] (IThreadPoolWorkItem) null; return true; } } node (IThreadPoolWorkItem) null; return false; } 使用自旋锁和内存读屏障来避免内核态和用户态的切换提高了获取callback的性能。如果还是没有callback那么就从所有的local work queue里随机选取一个然后在该local work queue里“偷取”一个任务(callback)。拿到callback后执行callback.ExecuteWorkItem()通知完成。总结ThreadPool提供了方法调整线程池最少活跃的线程来应对不同的并发场景。ThreadPool带有2个work queue一个golbal一个local。执行时先从local找任务接着去global最后才会去随机选取一个local偷一个任务其中global是FIFO的执行顺序。Work queue实际上是数组使用了大量的自旋锁和内存屏障来提高性能。但是在偷取任务上是否可以考虑得更多随机选择一个local太随意。首先要考虑偷取的队列上必须有可执行任务其次可以选取一个不在调度中的线程的local work queue这样降低了自旋锁的可能性加快了偷取的速度最后偷取的时候可以考虑像golang一样偷取别人queue里一半的任务因为执行完偷到的这一个任务之后下次该线程再次被调度到还是可能没任务可执行还得去偷取别人的任务这样既浪费CPU时间又让任务在线程上分布不均匀降低了系统吞吐量 另外如果禁用log和ETW trace可以使ThreadPool的性能更进一步。 原文地址 http://www.cnblogs.com/newbier/p/6192882.html .NET社区新闻深度好文微信中搜索dotNET跨平台或扫描二维码关注