山东省建设人才网站,wordpress标题字体太大,江西医疗网站建设,网站建设和备案的顺序1. FQueuedThreadPool IQueuedWork
FQueuedThreadPool是UE4中抽象出的线程池。线程池由若干个Worker线程#xff0c;和一个同步队列构成。UE4把同步队列执行的任务抽象为IQueuedWork. 线程池的同步队列#xff0c;就是一个IQueuedWork的队列了。借用wiki上线程池的图,…1. FQueuedThreadPool IQueuedWork
FQueuedThreadPool是UE4中抽象出的线程池。线程池由若干个Worker线程和一个同步队列构成。UE4把同步队列执行的任务抽象为IQueuedWork. 线程池的同步队列就是一个IQueuedWork的队列了。借用wiki上线程池的图, UE4的FQueuedThreadPool也是如图中所示的结构 Thread pool
生产者生产IQueuedWork的实例对象。线程池会向生产者提供入队的接口。线程池中的Worker线程都是消费者会不停地从队列中取出IQueuedWork并执行work.
下面的代码就是FQueuedThreadPool给用户使用的接口
class CORE_API FQueuedThreadPool
{
public:// 创建线程池指定线程数还有每个worker栈大小及优先级virtual bool Create( uint32 InNumQueuedThreads, uint32 StackSize, EThreadPriority ThreadPriority) 0;// 销毁线程池对Task Queue和每个worker线程执行清理操作virtual void Destroy() 0;// 生产者使用这个接口向同步队列添加IQueuedWorkvirtual void AddQueuedWork(IQueuedWork* InQueuedWork) 0;// 生产者使用这个接口尝试删除一个IQueuedWorkvirtual bool RetractQueuedWork(IQueuedWork* InQueuedWork) 0;// 获取线程池中worker线程的数目virtual int32 GetNumThreads() const 0;
};需要提及的是RetractQueuedWork接口只能尝试去删除或取消一个work对象。如果work不在队列当中或者请求删除时已经在执行和执行完成都无法取消。
IQueuedWork是同步队列中任务对象的抽象。代码如下
class IQueuedWork
{
public:virtual void DoThreadedWork() 0;virtual void Abandon() 0;
};IQueuedWork的接口很简单我们只需要实现代码中的两个接口分别是任务的执行流程和废弃当前任务的接口。
2. FQueuedThread
FQueuedThread就是线程池worker线程的实现了。它是一个FRunnable的实现类并内聚了一个FRunnableThread的实例对象。
class FQueuedThread : public FRunnable
{
protected:FRunnableThread* Thread;virtual uint32 Run() override;
};FQueuedThread实现的Run函数就是类似上一篇我们实现的MyRunnable的空闲等待的流程。我们回顾一下实现所需的部件
一个原子布尔变量作为循环的标识位一个FEvent用来让线程在无任务可做时挂起而不占用系统资源
按照上面的思路我们继续补完代码
class FQueuedThread : public FRunnable
{
protected:FEvent* DoWorkEvent;TAtomicbool TimeToDie;FRunnableThread* Thread;virtual uint32 Run() override{while(TimeToDie.Load(EMemoryOrder::Relaxed)){DoWorkEvent-Wait();// TODO ... do work}}
};这样的实现有很严重的缺陷。无穷时间的等待线程被挂起后UE4无法获取这些线程的状态了。因此UE4采用的是等待10ms再check是否继续等待。
while(TimeToDie.Load(EMemoryOrder::Relaxed))
{bool bContinueWaiting true; while(bContinueWaiting){DECLARE_SCOPE_CYCLE_COUNTER(...); // record statusbContinueWaiting !DoWorkEvent-Wait( 10 );}// TODO ... do work
}被唤醒后意味着两种情况
新的任务分配下来有活干了线程池发出清理指令线程即将退出
把执行Work的代码加入如下所示
class FQueuedThread : public FRunnable
{
protected:FEvent* DoWorkEvent;TAtomicbool TimeToDie;FRunnableThread* Thread;IQueuedWork* volatile QueuedWork;virtual uint32 Run() override{while(TimeToDie.Load(EMemoryOrder::Relaxed)){bool bContinueWaiting true; while(bContinueWaiting){DECLARE_SCOPE_CYCLE_COUNTER(...); // record statusbContinueWaiting !DoWorkEvent-Wait( 10 );}IQueuedWork* LocalQueuedWork QueuedWork;QueuedWork nullptr;FPlatformMisc::MemoryBarrier();check(LocalQueuedWork || TimeToDie.Load(EMemoryOrder::Relaxed));while (LocalQueuedWork){LocalQueuedWork-DoThreadedWork();LocalQueuedWork OwningThreadPool-ReturnToPoolOrGetNextJob(this);} }return 0}
};QueuedWork就是需要执行的work对象的指针它被volatile修饰说明还有其他的线程会修改这个指针防止编译器生成直接从缓存中读取代码的优化。check方法明显地指明了被唤醒时只有前面提及的两种情况。如果Work不为空则调用IQueuedWork的DoThreadedWork接口。任务完成后的下一行代码就是向所属线程池的同步队列再申请一个任务。如果队列中有任务则继续执行新的任务。若队列已经为空则将线程归还到线程池。线程池有一个QueuedThreads成员记录线程池中的空闲的线程。
个人觉得UE4在check之后的实现略有不妥。在同时有Work要执行和TimeToDie为true时UE4选择了继续执行完Work再退出。笔者认为TimeToDie为true时应该放弃执行当前的work直接退出。当然这里不同的策略差别也不大也不重要。
还有一个重要的函数就是FQueuedThread::DoWork. 它是由生产者调用线程池的AddQueuedWork线程池对象在进行调度的时候调用的。DoWork函数代码如下
void FQueuedThread::DoWork(IQueuedWork* InQueuedWork)
{// ...QueuedWork InQueuedWork;FPlatformMisc::MemoryBarrier();// Tell the thread to wake up and do its jobDoWorkEvent-Trigger();
}值得提及的是两个函数中的内存屏障代码FPlatformMisc::MemoryBarrier(). DoWork中会对QueuedWork进行写操作而在Run函数中会对QueuedWork进行读操作而且DoWork与Run发生在不同的线程这样就产生了竞争条件race condition. 一般的情况是上一个mutex lock而UE4却没有只使用了内存屏障。原因是这个竞争条件发生的时候有且仅有一个线程写有且仅有一个线程读并且DoWork中的DoWorkEvent-Trigger()发出一个事件告知已经准备好一个IQueuedWork一定发生在Run函数中读取IQueuedWork之前。所以UE4使用内存屏障来保证顺序一致性让Run函数从另外一个线程读取IQueuedWork时能够读取到已经同步过后的值。关于无锁编程大家感兴趣可以上purecpp的相关专题一起讨论。
3. FQueuedThreadPoolBase
再来看看线程池的实现类。FQueuedThreadPool的实现类只有一个就是FQueuedThreadPoolBase类。我们从它的数据成员可以很清晰地可以看出该线程池的结构与第一节的所示的线程池的结构图是基本吻合的
class FQueuedThreadPoolBase : public FQueuedThreadPool
{
protected:/** The work queue to pull from. */TArrayIQueuedWork* QueuedWork;/** The thread pool to dole work out to. */TArrayFQueuedThread* QueuedThreads;/** All threads in the pool. */TArrayFQueuedThread* AllThreads;/** The synchronization object used to protect access to the queued work. */FCriticalSection* SynchQueue;/** If true, indicates the destruction process has taken place. */bool TimeToDie;// ....
}数组QueuedWork和互斥锁SynchQueue组成了一个线程安全的同步队列。AllThreads管理着全部的worker线程。TimeToDie是标识线程池生命状态如果置为true线程池的清理工作正在进行或者已经进行完毕了。还有一个QueuedThreads成员它管理着空闲的线程也就是上一节FQueuedThread归还自己到线程池的空闲队列。
线程池的创建会依次创建每个worker线程。线程池销毁的时候会依次向每个worker线程发出销毁的命令并等待线程退出。线程池的销毁会放弃还未执行的work. 创建和销毁的流程较为简单就不详细展开了。后文着重讨论生产者向线程池添加work的流程。
生产者创建了一个IQueuedWork实现对象后会调用第一节提及的AddQueuedWork接口向线程池添加要执行的work. UE4控制线程池添加work的流程实现的较为精细。它将线程池的状态分成了两类来分别处理。这两种状态分别为 1. 线程池中还有空闲线程即QueuedThreads不为空并且QueuedWork一定为空 2. 线程池中已经没有空闲的线程即QueuedThreads为空
第一个情景的处理策略是从空闲线程数组中取一个线程并直接唤醒该线程执行由生产者当前传递进来的work. 第二个情景较为简单由于没有空闲线程可用就直接将work入队即可。
void FQueuedThreadPoolBase::AddQueuedWork(IQueuedWork* InQueuedWork) /*override*/
{// ....FQueuedThread* Thread nullptr;{FScopeLock sl(SynchQueue);const int32 AvailableThreadCount QueuedThreads.Num();if (AvailableThreadCount 0){// situation 2QueuedWork.Add(InQueuedWork);return;}// situation 1const int32 ThreadIndex AvailableThreadCount - 1;Thread QueuedThreads[ThreadIndex];QueuedThreads.RemoveAt(ThreadIndex, 1, false);}Thread-DoWork(InQueuedWork);
}UE4处理情景一的实现有两个优点。
第一UE4并不是简单地让每个线程抢占任务队列中work. 而是在当有空闲线程的时候小心地获取一个空闲线程指定work并唤醒这一个线程。这样做的好处是不会出现惊群效应而让CPU浪费时间做无用的线程调度。
第二从代码中可以看出UE4每次获取空闲线程都是取数组的最末尾的空闲线程也就是最近归还的work线程。这样做的好处是最近归还的线程意味着它相比其他空闲线程是更近期使用过的。它有更大的概率操作系统还未对它进行context切换或者它的context数据还留存在缓存当中。优先使用该线程就有更大的概率获取较为低廉的线程切换开销。
最后线程池为worker线程提供的从线程池获取下一个可用的work和归还空闲线程的接口ReturnToPoolOrGetNextJob函数:
IQueuedWork* FQueuedThreadPoolBase::ReturnToPoolOrGetNextJob(FQueuedThread* InQueuedThread) /*override*/
{// ... omitted codesIQueuedWork* Work nullptr;FScopeLock sl(SynchQueue);// ... omitted codesif (QueuedWork.Num() 0){Work QueuedWork[0];QueuedWork.RemoveAt(0, 1, false);}if (!Work)QueuedThreads.Add(InQueuedThread);return Work;
}当任务队列中还有work时就从队列头部取出一个是一个FIFO的同步队列。当任务队列为空无法取出新的任务时线程就将自己归还给到线程池中标记为空闲队列。UE4这里实现的不太妥当的就是QueuedWork是一个TArrayIQueuedWork*数组。数组对非尾部元素的Remove操作是会对数组元素进行移动的。虽然移动指针并不是很昂贵而且UE4也禁止了Remove导致的shrink操作但开销依然是存在的。这里最好的方案是使用一个可以扩容的环状队列。
4. 小结
本文讨论了UE4中线程池的实现细节。线程池FQueuedThreadPool的实现是由一个元素为IQueuedWork*的同步队列及若干个worker线程所组成。UE4中的线程池将IQueuedWork队列化并用FIFO的调度策略。线程池为IQueuedWork的生产者提供了入队接口并为worker线程消费者提供了获取出队接口。UE4对线程池的性能优化也做了不少的工作。例如避免线程池抢占IQueuedWork时可能会发生的惊群现象以及取最近使用的线程还有无锁编程等。
专题的下一篇我们将讨论UE4中的AsyncTask. 这也是UE4迈向现代C设计的有力步伐。