东至网站制作,建筑工程网状结构,深圳网络推广优化,长沙有哪些网络平台公司作为多线程和并行计算不得不考虑的问题就是临界资源的访问问题#xff0c;解决临界资源的访问通常是加锁或者是使用信号量#xff0c;这个大家应该很熟悉了。 而集合作为一种重要的临界资源#xff0c;通用性更广#xff0c;为了让大家更安全的使用它们#xff0c;微软为我… 作为多线程和并行计算不得不考虑的问题就是临界资源的访问问题解决临界资源的访问通常是加锁或者是使用信号量这个大家应该很熟悉了。 而集合作为一种重要的临界资源通用性更广为了让大家更安全的使用它们微软为我们带来了强大的并行集合System.Collections.Concurrent里面的各位仁兄们。 首先咱们从一个经典的问题谈起。
生产者消费者问题 这个问题是最为经典的多线程应用问题简单的表述这个问题就是有一个或多个线程(生产者线程)产生一些数据同时还有一个或者多个线程(消费者线程)要取出这些数据并执行一些相应的工作。如下图所示 下面就是使用程序去描述这个问题了。 最直接的想法可能是这样 static void Main(string[] args)
{int count 0;// 临界资源区var queue new Queuestring();// 生产者线程Task.Factory.StartNew(() {while (true){queue.Enqueue(value count);count;}});// 消费者线程1Task.Factory.StartNew(() {while (true){if (queue.Count 0){string value queue.Dequeue();Console.WriteLine(Worker 1: value);}}});// 消费者线程2Task.Factory.StartNew(() {while (true){if (queue.Count 0){string value queue.Dequeue();Console.WriteLine(Worker 2: value);}}});Thread.Sleep(50000);
} 使用Queuestring模拟了一个简单的资源池一个生产者放数据两个消费者消费数据。上面这个程序运行以后会产生异常异常的原因很简单当某个时刻第一个消费者判断queue.Count 0为true时就会到Queue中取数据但是这个时候数据可能会被第二个消费者拿走了因为第二个消费者也判断出此时有数据可取。这是一个简单的临界资源线程安全问题。 知道问题了那么如何解决呢 第一种方案是加锁这个方案是可行的很多时候我们也是这么做的包括微软早期实现线程安全的ArrayList和Hashtable内部(Synchronized方法)也是这么实现的。这个方案适用于只有少量的消费者并且每个消费者都会执行大量操作的时候这时lock并没什么太大问题但是如果是大批量短小精悍的消费者存在的话lock会严重影响代码的执行效率。 第二种方案就是我们直接用新的线程安全的集合区解决这个问题。新的线程安全的这些集合内部不再使用lock机制这种比较低效的方式去实现线程安全而是转而使用SpinWait和Interlocked等机制间接实现了线程安全这种方式的效率要高于使用lock的方式。看一下实现代码 var queue new ConcurrentQueuestring();
Task.Factory.StartNew(()
{while (true){queue.Enqueue(value count);count;}
});Task.Factory.StartNew(()
{while (true){string value;if (queue.TryDequeue(out value)){Console.WriteLine(Worker 1: value);}}
});Task.Factory.StartNew(()
{while (true){string value;if (queue.TryDequeue(out value)){Console.WriteLine(Worker 2: value);}}
}); 执行这段代码可以工作但是有点不太优雅能不能不要去判断集合是否为空集合当自己没有元素的时候自己Block一下可以吗答案当然是可以的使用BlockingCollection即可 var blockingCollection new BlockingCollectionstring();
Task.Factory.StartNew(()
{while (true){blockingCollection.Add(value count);count;}
});Task.Factory.StartNew(()
{while (true){Console.WriteLine(Worker 1: blockingCollection.Take());}
});Task.Factory.StartNew(()
{while (true){Console.WriteLine(Worker 2: blockingCollection.Take());}
}); BlockingCollection集合是一个拥有阻塞功能的集合它就是完成了经典生产者消费者的算法功能。它没有实现底层的存储结构而是使用了实现IProducerConsumerCollection接口的几个集合作为底层的数据结构例如ConcurrentBag ConcurrentStack或者是ConcurrentQueue。你可以在构造BlockingCollection实例的时候传入这个参数如果不指定的话则默认使用ConcurrentQueue作为存储结构。 而对于生产者来说只需要通过调用其Add方法放数据消费者只需要调用Take方法来取数据就可以了。 当然了上面的消费者代码中还有一点是让人不爽的那就是while语句可以更优雅一点吗答案还是肯定的 Task.Factory.StartNew(()
{foreach (string value in blockingCollection.GetConsumingEnumerable()){Console.WriteLine(Worker 1: value);}
}); GetConsumingEnumerable()方法是关键这个方法会遍历集合取出数据一旦发现集合空了则阻塞自己直到集合中又有元素了再开始遍历神奇吧。 好了到此完美了解决了生产者消费者问题。然而通常来说还有两个问题我们有时需要去控制
第一个问题控制集合中数据的最大数量。 这个问题由BlockingCollection构造函数解决构造该对象实例的时候构造函数中的BoundedCapacity决定了集合最大的可容纳数据数量这个比较简单不多说了。
第二个问题何时停止的问题。 这个问题由CompleteAdding和IsCompleted两个配合解决。 CompleteAdding方法是直接不允许任何元素被加入集合当使用了CompleteAdding方法后且集合内没有元素的时候另一个属性IsCompleted此时会为True这个属性可以用来判断是否当前集合内的所有元素都被处理完。看一下生产者修改后的代码 Task.Factory.StartNew(()
{for (int count 0; count 10; count){blockingCollection.Add(value count);}blockingCollection.CompleteAdding();
}); 当使用了CompleteAdding方法后对象停止往集合中添加数据这时如果是使用GetConsumingEnumerable枚举的那么这种枚举会自然结束不会再Block住集合这种方式最优雅也是推荐的写法。但是如果是使用TryTake访问元素的则需要使用IsCompleted判断一下因为这个时候使用TryTake会抛InvalidOperationException异常。
看一下最终的代码形式 static void Main(string[] args)
{var blockingCollection new BlockingCollectionstring();var producer Task.Factory.StartNew(() {for (int count 0; count 10; count){blockingCollection.Add(value count);Thread.Sleep(300);}blockingCollection.CompleteAdding();});var consumer1 Task.Factory.StartNew(() {foreach (string value in blockingCollection.GetConsumingEnumerable()){Console.WriteLine(Worker 1: value);}});var consumer2 Task.Factory.StartNew(() {foreach (string value in blockingCollection.GetConsumingEnumerable()){Console.WriteLine(Worker 2: value);}});Task.WaitAll(producer, consumer1, consumer2);
} BlockingCollection的枚举 此外需要注意BlockingCollection有两种枚举方法首先BlockingCollection本身继承自IEnumerableT所以它自己就可以被foreach枚举首先BlockingCollection包装了一个线程安全集合那么它自己也是线程安全的而当多个线程在同时修改或访问线程安全容器时BlockingCollection自己作为IEnumerable会返回一个一定时间内的集合片段也就是只会枚举在那个时间点上内部集合的元素。使用这种方式枚举的时候不会有Block效果。 另外一种方式就是我们上面使用的GetConsumingEnumerable方式的枚举这种方式会有Block效果直到CompleteAdding被调用为止。 最后提一下实现IProducerConsumerCollection接口的几个集合ConcurrentBag(线程安全的无序的元素集合) ConcurrentStack(线程安全的堆栈)和ConcurrentQueue(线程安全的队列)。这些都很简单功能与非线程安全的那些集合都一样只不多是多了TryXXX方法多线程环境下使用这些方法就好了其他就不多说了。 到此生产者和消费者这个经典的问题告一段落了。 System.Collections.Concurrent下面的集合除了解决生产者消费者问题外还有一些与多线程相关的集合例如
1. ConcurrentDictionary这个是键/值对字典的线程安全实现这个类在原来的基础上也添加了一下新的方法例如AddOrUpdateGetOrAddTryXXX等等都很容易理解。
2. 各种Partitioner 类提供针对数组、列表和可枚举项的常见分区策略。 若要对数据源操作进行并行化其中一个必要步骤是将源分区为可由多个线程同时访问的多个部分。 PLINQ 和任务并行库 (TPL) 提供了默认的分区程序当编写并行查询或ForEach循环时默认的分区程序以透明方式工作。 但是毫无疑问对于一些复杂的情况我们是可以插入自己的分区程序的这就是微软为我们提供的各种Partitioner类这个不多说了感兴趣的同学请自己参考一下MSDN。