网站建设极地网,wordpress连接微博专业版 破解,网页设计与制作有什么感想,重庆交易网站建设目录
一、前言二、ConcurrentBag类三、 ConcurrentBag线程安全实现原理 1. ConcurrentBag的私有字段2. 用于数据存储的ThreadLocalList类3. ConcurrentBag实现新增元素4. ConcurrentBag 如何实现迭代器模式四、总结笔者水平有限#xff0c;如果错误欢迎各位批评指正#xff…目录
一、前言二、ConcurrentBag类三、 ConcurrentBag线程安全实现原理 1. ConcurrentBag的私有字段2. 用于数据存储的ThreadLocalList类3. ConcurrentBag实现新增元素4. ConcurrentBag 如何实现迭代器模式四、总结笔者水平有限如果错误欢迎各位批评指正一、前言#
笔者最近在做一个项目项目中为了提升吞吐量使用了消息队列中间实现了生产消费模式在生产消费者模式中需要有一个集合来存储生产者所生产的物品笔者使用了最常见的ListT集合类型。
由于生产者线程有很多个消费者线程也有很多个所以不可避免的就产生了线程同步的问题。开始笔者是使用lock关键字进行线程同步但是性能并不是特别理想然后有网友说可以使用SynchronizedListT来代替使用ListT达到线程安全的目的。于是笔者就替换成了SynchronizedListT但是发现性能依旧糟糕于是查看了SynchronizedListT的源代码发现它就是简单的在ListT提供的API的基础上加了lock所以性能基本与笔者实现方式相差无几。
最后笔者找到了解决的方案使用ConcurrentBagT类来实现性能有很大的改观于是笔者查看了ConcurrentBagT的源代码实现非常精妙特此在这记录一下。
二、ConcurrentBag类#
ConcurrentBagT实现了IProducerConsumerCollectionT接口该接口主要用于生产者消费者模式下可见该类基本就是为生产消费者模式定制的。然后还实现了常规的IReadOnlyCollectionT类实现了该类就需要实现IEnumerableT、IEnumerable、 ICollection类。
ConcurrentBagT对外提供的方法没有ListT那么多但是同样有Enumerable实现的扩展方法。类本身提供的方法如下所示。
名称说明Add将对象添加到 ConcurrentBag 中。CopyTo从指定数组索引开始将 ConcurrentBag 元素复制到现有的一维 Array 中。Equals(Object)确定指定的 Object 是否等于当前的 Object。 继承自 Object。Finalize允许对象在“垃圾回收”回收之前尝试释放资源并执行其他清理操作。 继承自 Object。GetEnumerator返回循环访问 ConcurrentBag 的枚举器。GetHashCode用作特定类型的哈希函数。 继承自 Object。GetType获取当前实例的 Type。 继承自 Object。MemberwiseClone创建当前 Object 的浅表副本。 继承自 Object。ToArray将 ConcurrentBag 元素复制到新数组。ToString返回表示当前对象的字符串。 继承自 Object。TryPeek尝试从 ConcurrentBag 返回一个对象但不移除该对象。TryTake尝试从 ConcurrentBag 中移除并返回对象。
三、 ConcurrentBag线程安全实现原理#
1. ConcurrentBag的私有字段#
ConcurrentBag线程安全实现主要是通过它的数据存储的结构和细颗粒度的锁。 Copy
public class ConcurrentBagT : IProducerConsumerCollectionT, IReadOnlyCollectionT { // ThreadLocalList对象包含每个线程的数据 ThreadLocalThreadLocalList m_locals; // 这个头指针和尾指针指向中的第一个和最后一个本地列表这些本地列表分散在不同线程中 // 允许在线程局部对象上枚举 volatile ThreadLocalList m_headList, m_tailList; // 这个标志是告知操作线程必须同步操作 // 在GlobalListsLock 锁中 设置 bool m_needSync; }
首选我们来看它声明的私有字段其中需要注意的是集合的数据是存放在ThreadLocal线程本地存储中的。也就是说访问它的每个线程会维护一个自己的集合数据列表一个集合中的数据可能会存放在不同线程的本地存储空间中所以如果线程访问自己本地存储的对象那么是没有问题的这就是实现线程安全的第一层使用线程本地存储数据。
然后可以看到ThreadLocalList m_headList, m_tailList;这个是存放着本地列表对象的头指针和尾指针通过这两个指针我们就可以通过遍历的方式来访问所有本地列表。它使用volatile修饰不允许线程进行本地缓存每个线程的读写都是直接操作在共享内存上这就保证了变量始终具有一致性。任何线程在任何时间进行读写操作均是最新值。对于volatile修饰符感谢我是攻城狮指出描述错误。
最后又定义了一个标志这个标志告知操作线程必须进行同步操作这是实现了一个细颗粒度的锁因为只有在几个条件满足的情况下才需要进行线程同步。
2. 用于数据存储的ThreadLocalList类#
接下来我们来看一下ThreadLocalList类的构造该类就是实际存储了数据的位置。实际上它是使用双向链表这种结构进行数据存储。 Copy
[Serializable] // 构造了双向链表的节点 internal class Node { public Node(T value) { m_value value; } public readonly T m_value; public Node m_next; public Node m_prev; } /// summary /// 集合操作类型 /// /summary internal enum ListOperation { None, Add, Take }; /// summary /// 线程锁定的类 /// /summary internal class ThreadLocalList { // 双向链表的头结点 如果为null那么表示链表为空 internal volatile Node m_head; // 双向链表的尾节点 private volatile Node m_tail; // 定义当前对List进行操作的种类 // 与前面的 ListOperation 相对应 internal volatile int m_currentOp; // 这个列表元素的计数 private int m_count; // The stealing count // 这个不是特别理解 好像是在本地列表中 删除某个Node 以后的计数 internal int m_stealCount; // 下一个列表 可能会在其它线程中 internal volatile ThreadLocalList m_nextList; // 设定锁定是否已进行 internal bool m_lockTaken; // The owner thread for this list internal Thread m_ownerThread; // 列表的版本只有当列表从空变为非空统计是底层 internal volatile int m_version; /// summary /// ThreadLocalList 构造器 /// /summary /// param nameownerThread拥有这个集合的线程/param internal ThreadLocalList(Thread ownerThread) { m_ownerThread ownerThread; } /// summary /// 添加一个新的item到链表首部 /// /summary /// param nameitemThe item to add./param /// param nameupdateCount是否更新计数./param internal void Add(T item, bool updateCount) { checked { m_count; } Node node new Node(item); if (m_head null) { Debug.Assert(m_tail null); m_head node; m_tail node; m_version; // 因为进行初始化了所以将空状态改为非空状态 } else { // 使用头插法 将新的元素插入链表 node.m_next m_head; m_head.m_prev node; m_head node; } if (updateCount) // 更新计数以避免此添加同步时溢出 { m_count m_count - m_stealCount; m_stealCount 0; } } /// summary /// 从列表的头部删除一个item /// /summary /// param nameresultThe removed item/param internal void Remove(out T result) { // 双向链表删除头结点数据的流程 Debug.Assert(m_head ! null); Node head m_head; m_head m_head.m_next; if (m_head ! null) { m_head.m_prev null; } else { m_tail null; } m_count--; result head.m_value; } /// summary /// 返回列表头部的元素 /// /summary /// param nameresultthe peeked item/param /// returnsTrue if succeeded, false otherwise/returns internal bool Peek(out T result) { Node head m_head; if (head ! null) { result head.m_value; return true; } result default(T); return false; } /// summary /// 从列表的尾部获取一个item /// /summary /// param nameresultthe removed item/param /// param nameremoveremove or peek flag/param internal void Steal(out T result, bool remove) { Node tail m_tail; Debug.Assert(tail ! null); if (remove) // Take operation { m_tail m_tail.m_prev; if (m_tail ! null) { m_tail.m_next null; } else { m_head null; } // Increment the steal count m_stealCount; } result tail.m_value; } /// summary /// 获取总计列表计数, 它不是线程安全的, 如果同时调用它, 则可能提供不正确的计数 /// /summary internal int Count { get { return m_count - m_stealCount; } } }
从上面的代码中我们可以更加验证之前的观点就是ConcurentBagT在一个线程中存储数据时使用的是双向链表ThreadLocalList实现了一组对链表增删改查的方法。
3. ConcurrentBag实现新增元素#
接下来我们看一看ConcurentBagT是如何新增元素的。 Copy
/// summary /// 尝试获取无主列表无主列表是指线程已经被暂停或者终止但是集合中的部分数据还存储在那里 /// 这是避免内存泄漏的方法 /// /summary /// returns/returns private ThreadLocalList GetUnownedList() { //此时必须持有全局锁 Contract.Assert(Monitor.IsEntered(GlobalListsLock)); // 从头线程列表开始枚举 找到那些已经被关闭的线程 // 将它所在的列表对象 返回 ThreadLocalList currentList m_headList; while (currentList ! null) { if (currentList.m_ownerThread.ThreadState System.Threading.ThreadState.Stopped) { currentList.m_ownerThread Thread.CurrentThread; // the caller should acquire a lock to make this line thread safe return currentList; } currentList currentList.m_nextList; } return null; } /// summary /// 本地帮助方法通过线程对象检索线程线程本地列表 /// /summary /// param nameforceCreate如果列表不存在那么创建新列表/param /// returnsThe local list object/returns private ThreadLocalList GetThreadList(bool forceCreate) { ThreadLocalList list m_locals.Value; if (list ! null) { return list; } else if (forceCreate) { // 获取用于更新操作的 m_tailList 锁 lock (GlobalListsLock) { // 如果头列表等于空那么说明集合中还没有元素 // 直接创建一个新的 if (m_headList null) { list new ThreadLocalList(Thread.CurrentThread); m_headList list; m_tailList list; } else { // ConcurrentBag内的数据是以双向链表的形式分散存储在各个线程的本地区域中 // 通过下面这个方法 可以找到那些存储有数据 但是已经被停止的线程 // 然后将已停止线程的数据 移交到当前线程管理 list GetUnownedList(); // 如果没有 那么就新建一个列表 然后更新尾指针的位置 if (list null) { list new ThreadLocalList(Thread.CurrentThread); m_tailList.m_nextList list; m_tailList list; } } m_locals.Value list; } } else { return null; } Debug.Assert(list ! null); return list; } /// summary /// Adds an object to the see crefConcurrentBag{T}/. /// /summary /// param nameitemThe object to be added to the /// see crefConcurrentBag{T}/. The value can be a null reference /// (Nothing in Visual Basic) for reference types./param public void Add(T item) { // 获取该线程的本地列表, 如果此线程不存在, 则创建一个新列表 (第一次调用 add) ThreadLocalList list GetThreadList(true); // 实际的数据添加操作 在AddInternal中执行 AddInternal(list, item); } /// summary /// /summary /// param namelist/param /// param nameitem/param private void AddInternal(ThreadLocalList list, T item) { bool lockTaken false; try { #pragma warning disable 0420 Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Add); #pragma warning restore 0420 // 同步案例: // 如果列表计数小于两个, 因为是双向链表的关系 为了避免与任何窃取线程发生冲突 必须获取锁 // 如果设置了 m_needSync, 这意味着有一个线程需要冻结包 也必须获取锁 if (list.Count 2 || m_needSync) { // 将其重置为None 以避免与窃取线程的死锁 list.m_currentOp (int)ListOperation.None; // 锁定当前对象 Monitor.Enter(list, ref lockTaken); } // 调用 ThreadLocalList.Add方法 将数据添加到双向链表中 // 如果已经锁定 那么说明线程安全 可以更新Count 计数 list.Add(item, lockTaken); } finally { list.m_currentOp (int)ListOperation.None; if (lockTaken) { Monitor.Exit(list); } } }
从上面代码中我们可以很清楚的知道Add()方法是如何运行的其中的关键就是GetThreadList()方法通过该方法可以获取当前线程的数据存储列表对象假如不存在数据存储列表它会自动创建或者通过GetUnownedList()方法来寻找那些被停止但是还存储有数据列表的线程然后将数据列表返回给当前线程中防止了内存泄漏。
在数据添加的过程中实现了细颗粒度的lock同步锁所以性能会很高。删除和其它操作与新增类似本文不再赘述。
4. ConcurrentBag 如何实现迭代器模式#
看完上面的代码后我很好奇ConcurrentBagT是如何实现IEnumerator来实现迭代访问的因为ConcurrentBagT是通过分散在不同线程中的ThreadLocalList来存储数据的那么在实现迭代器模式时过程会比较复杂。
后面再查看了源码之后发现ConcurrentBagT为了实现迭代器模式将分在不同线程中的数据全都存到一个ListT集合中然后返回了该副本的迭代器。所以每次访问迭代器它都会新建一个ListT的副本这样虽然浪费了一定的存储空间但是逻辑上更加简单了。 Copy
/// summary /// 本地帮助器方法释放所有本地列表锁 /// /summary private void ReleaseAllLocks() { // 该方法用于在执行线程同步以后 释放掉所有本地锁 // 通过遍历每个线程中存储的 ThreadLocalList对象 释放所占用的锁 ThreadLocalList currentList m_headList; while (currentList ! null) { if (currentList.m_lockTaken) { currentList.m_lockTaken false; Monitor.Exit(currentList); } currentList currentList.m_nextList; } } /// summary /// 从冻结状态解冻包的本地帮助器方法 /// /summary /// param namelockTakenThe lock taken result from the Freeze method/param private void UnfreezeBag(bool lockTaken) { // 首先释放掉 每个线程中 本地变量的锁 // 然后释放全局锁 ReleaseAllLocks(); m_needSync false; if (lockTaken) { Monitor.Exit(GlobalListsLock); } } /// summary /// 本地帮助器函数等待所有未同步的操作 /// /summary private void WaitAllOperations() { Contract.Assert(Monitor.IsEntered(GlobalListsLock)); ThreadLocalList currentList m_headList; // 自旋等待 等待其它操作完成 while (currentList ! null) { if (currentList.m_currentOp ! (int)ListOperation.None) { SpinWait spinner new SpinWait(); // 有其它线程进行操作时会将cuurentOp 设置成 正在操作的枚举 while (currentList.m_currentOp ! (int)ListOperation.None) { spinner.SpinOnce(); } } currentList currentList.m_nextList; } } /// summary /// 本地帮助器方法获取所有本地列表锁 /// /summary private void AcquireAllLocks() { Contract.Assert(Monitor.IsEntered(GlobalListsLock)); bool lockTaken false; ThreadLocalList currentList m_headList; // 遍历每个线程的ThreadLocalList 然后获取对应ThreadLocalList的锁 while (currentList ! null) { // 尝试/最后 bllock 以避免在获取锁和设置所采取的标志之间的线程港口 try { Monitor.Enter(currentList, ref lockTaken); } finally { if (lockTaken) { currentList.m_lockTaken true; lockTaken false; } } currentList currentList.m_nextList; } } /// summary /// Local helper method to freeze all bag operations, it /// 1- Acquire the global lock to prevent any other thread to freeze the bag, and also new new thread can be added /// to the dictionary /// 2- Then Acquire all local lists locks to prevent steal and synchronized operations /// 3- Wait for all un-synchronized operations to be done /// /summary /// param namelockTakenRetrieve the lock taken result for the global lock, to be passed to Unfreeze method/param private void FreezeBag(ref bool lockTaken) { Contract.Assert(!Monitor.IsEntered(GlobalListsLock)); // 全局锁定可安全地防止多线程调用计数和损坏 m_needSync Monitor.Enter(GlobalListsLock, ref lockTaken); // 这将强制同步任何将来的添加/执行操作 m_needSync true; // 获取所有列表的锁 AcquireAllLocks(); // 等待所有操作完成 WaitAllOperations(); } /// summary /// 本地帮助器函数返回列表中的包项, 这主要由 CopyTo 和 ToArray 使用。 /// 这不是线程安全, 应该被称为冻结/解冻袋块 /// 本方法是私有的 只有使用 Freeze/UnFreeze之后才是安全的 /// /summary /// returnsList the contains the bag items/returns private ListT ToList() { Contract.Assert(Monitor.IsEntered(GlobalListsLock)); // 创建一个新的List ListT list new ListT(); ThreadLocalList currentList m_headList; // 遍历每个线程中的ThreadLocalList 将里面的Node的数据 添加到list中 while (currentList ! null) { Node currentNode currentList.m_head; while (currentNode ! null) { list.Add(currentNode.m_value); currentNode currentNode.m_next; } currentList currentList.m_nextList; } return list; } /// summary /// Returns an enumerator that iterates through the see /// crefConcurrentBag{T}/. /// /summary /// returnsAn enumerator for the contents of the see /// crefConcurrentBag{T}/./returns /// remarks /// The enumeration represents a moment-in-time snapshot of the contents /// of the bag. It does not reflect any updates to the collection after /// see crefGetEnumerator/ was called. The enumerator is safe to use /// concurrently with reads from and writes to the bag. /// /remarks public IEnumeratorT GetEnumerator() { // Short path if the bag is empty if (m_headList null) return new ListT().GetEnumerator(); // empty list bool lockTaken false; try { // 首先冻结整个 ConcurrentBag集合 FreezeBag(ref lockTaken); // 然后ToList 再拿到 List的 IEnumerator return ToList().GetEnumerator(); } finally { UnfreezeBag(lockTaken); } }
由上面的代码可知道为了获取迭代器对象总共进行了三步主要的操作。 使用FreezeBag()方法冻结整个ConcurrentBagT集合。因为需要生成集合的ListT副本生成副本期间不能有其它线程更改损坏数据。将ConcurrrentBagT生成ListT副本。因为ConcurrentBagT存储数据的方式比较特殊直接实现迭代器模式困难考虑到线程安全和逻辑最佳的办法是生成一个副本。完成以上操作以后就可以使用UnfreezeBag()方法解冻整个集合。那么FreezeBag()方法是如何来冻结整个集合的呢也是分为三步走。 首先获取全局锁通过Monitor.Enter(GlobalListsLock, ref lockTaken);这样一条语句这样其它线程就不能冻结集合。然后获取所有线程中ThreadLocalList的锁通过AcquireAllLocks()方法来遍历获取。这样其它线程就不能对它进行操作损坏数据。等待已经进入了操作流程线程结束通过WaitAllOperations()方法来实现该方法会遍历每一个ThreadLocalList对象的m_currentOp属性确保全部处于None操作。完成以上流程后那么就是真正的冻结了整个ConcurrentBagT集合要解冻的话也类似。在此不再赘述。
四、总结#
下面给出一张图描述了ConcurrentBagT是如何存储数据的。通过每个线程中的ThreadLocal来实现线程本地存储每个线程中都有这样的结构互不干扰。然后每个线程中的m_headList总是指向ConcurrentBagT的第一个列表m_tailList指向最后一个列表。列表与列表之间通过m_locals 下的 m_nextList相连构成一个单链表。
数据存储在每个线程的m_locals中通过Node类构成一个双向链表。 PS: 要注意m_tailList和m_headList并不是存储在ThreadLocal中而是所有的线程共享一份。 以上就是有关ConcurrentBagT类的实现笔者的一些记录和解析。
笔者水平有限如果错误欢迎各位批评指正
附上ConcurrentBagT源码地址戳一戳
作者InCerry
出处https://www.cnblogs.com/InCerry/p/9497729.html
版权本文采用「署名 4.0 国际」知识共享许可协议进行许可。 https://www.cnblogs.com/fancunwei/p/9442469.html orleans源码里面有用过许多和线程/队列相关的类 System.Collections.Concurrent下面的类ConcurrentQueue和ConcurrentDictionary等还有Interlocked/BlockingCollection等。 它使用volatile修饰所以它是线程安全的。有误吧volatile并不保证线程安全。 可能这句话笔者描述有遗漏因为多个线程同时访问一个变量允许每个线程进行本地缓存这就导致了变量的不一致性。volatile修饰的变量不允许线程进行本地缓存每个线程的读写都是直接操作在共享内存上这就保证了变量始终具有一致性。引用指针的大小和CPU位数一般都是一致所以操作使原子性的线程安全的。