【UE】 UE 多线程框架

2022-10-29  本文已影响0人  离原春草

最近有同学问到在UE中要想处理一个耗时20+ms的任务,要怎么做?考虑到这个任务是运行时执行的,为了避免导致卡顿,两个思路是:

  1. 分帧执行

  2. 异步调用

分帧执行的目的是化大为小,分成多帧来执行,由于这个任务不太方便拆解,因此这个思路可行性不高,唯一的方式就是异步执行了,然而由于对UE的多线程逻辑不是非常清晰,因此无法给出非常具体的实现方案,只能提供思路,这里为了补足对这一块的理解,抽空对UE的多线程框架做了学习,并梳理出这篇文章,后续会随着工作中的深入来对这篇文章进行不断迭代与完善。

1. 基本介绍

UE自己实现了一套多线程机制,这套机制包含了众多的异步方法实现(参考后面的介绍),此外,由于UE是支持C++11的,所以使用std::thread也是支持的。

2. 使用方式

UE的异步(多线程)执行有两种主要的方式,一种是基于异步类型的,一种则是基于全局异步方法的。

2.1 异步类型

基于类型的异步执行又可以分成三种:

  1. 继承FRunnable接口创建单个线程

    使用案例可以参考FAsyncWriter,FUdpSocketReceiver以及FTcpListener,这里也给出一个大致的实现代码:

class FRunnableTest : public FRunnable
    {
     virtual uint32 Run() override
     {
     UE_LOG(LogTemp, Log, TEXT("[FRunnableTest::Run] Start"));
     FPlatformProcess::Sleep(30);
     UE_LOG(LogTemp, Log, TEXT("[FRunnableTest::Run] End"));
     return 0;
     }
    
    };
    
    FRunnable* RunnablePtr = new FRunnableTest();
    TestThread = FRunnableThread::Create(RunnablePtr, TEXT("Just For Test"));

简单解释一下实现逻辑,UE抽象了一种基于可执行体 + 执行线程载体的组合方式,其中可执行体指的是FRunnable,而执行线程载体则是FRunnableThread,FRunnableThread的工作有:

在上面的代码里:我们直接继承自FRunnable创建了一个新的可执行体FRunnableTest,并对Run方法做了重载(实际上还有其他的函数也需要做重载,不过这里就不展示了)。

在需要进行异步执行的时候,会通过FRunnableThread的静态方法Create创建一个执行线程载体FRunnableThread,这个线程中执行的任务就是刚刚定义的执行体的Run方法。

  1. 创建AsyncTask来调用线程池里面空闲的线程
    这是一种基于线程池的异步任务处理系统,这套系统同样是基于Runnable实现的,在实际工作中,我们经常会遇到需要将部分代码放在特定的线程中进行执行,而这种问题就可以通过这种方式来解决:
if(IsInGameThread())
    {
     //….一些操作
    }
    else
    {
     AsyncTask(ENamedThreads::GameThread, [=]()
     {
     //….一些操作
     });
    }
  1. 通过TaskGraph系统来异步完成一些自定义任务
    这是一套抽象的异步任务处理系统,通过这套系统,我们可以创建多个多线程任务,并且指定各个任务之间的依赖关系,并按照该关系来依次处理任务,所有任务依赖关系形成一张有向无环图。

2.2 全局异步方法

UE还提供了若干个用于实现异步(多线程)执行的全局函数:

  1. AsyncTask
    这个方法调用GraphTask创建了一个立刻执行的任务,可以看成是是TaskGraph的简单版本。
    这个需要执行的任务可以指定执行的线程,需要注意的是,如果某个任务从AnyThread改成GameThread执行,AsyncTask下面的代码也是不会阻塞的。这个时候还是单线程,只是传入的Lambda方法会在主线程一帧里的其他地方调用,GameThread执行由于没有线程切换,因此整体时间消耗会少于AnyThread,不过问题则在于加长了整体的单帧时间消耗。

  2. Async
    当我们在进行异步调用的时候需要有返回值和回调函数的时候通常会使用Async方法,不过这个方法的性能也较差,因此如果不是十分必要,不要考虑这个方法。

    这个方法的第一个参数是EAsyncExecution,指定了任务执行的方式:

/**
     * Enumerates available asynchronous execution methods.
     */
    enum class EAsyncExecution
    {
     /** Execute in Task Graph (for short running tasks). */
     TaskGraph,
    
     /** Execute in Task Graph on the main thread (for short running tasks). */
     TaskGraphMainThread,
    
     /** Execute in separate thread if supported (for long running tasks). */
     Thread,
    
     /** Execute in separate thread if supported or supported post fork (see FForkProcessHelper::CreateThreadIfForkSafe) (for long running tasks). */
     ThreadIfForkSafe,
    
     /** Execute in global queued thread pool. */
     ThreadPool,
    
    #if WITH_EDITOR
     /** Execute in large global queued thread pool. */
     LargeThreadPool
    #endif
    };

进入到这个方法的内部实现:

/**
     * Execute a given function asynchronously.
     *
     * Usage examples:
     *
     *  // using global function
     *  int TestFunc()
     *  {
     *      return 123;
     *  }
     *
     *  TUniqueFunction<int()> Task = TestFunc();
     *  auto Result = Async(EAsyncExecution::Thread, Task);
     *
     *  // using lambda
     *  TUniqueFunction<int()> Task = []()
     *  {
     *      return 123;
     *  }
     *
     *  auto Result = Async(EAsyncExecution::Thread, Task);
     *
     *
     *  // using inline lambda
     *  auto Result = Async(EAsyncExecution::Thread, []() {
     *      return 123;
     *  }
     *
     * @param CallableType The type of callable object.
     * @param Execution The execution method to use, i.e. on Task Graph or in a separate thread.
     * @param Function The function to execute.
     * @param CompletionCallback An optional callback function that is executed when the function completed execution.
     * @return A TFuture object that will receive the return value from the function.
     */
    template<typename CallableType>
    auto Async(EAsyncExecution Execution, CallableType&& Callable, TUniqueFunction<void()> CompletionCallback = nullptr) -> TFuture<decltype(Forward<CallableType>(Callable)())>
    {
     using ResultType = decltype(Forward<CallableType>(Callable)());
     TUniqueFunction<ResultType()> Function(Forward<CallableType>(Callable));
     TPromise<ResultType> Promise(MoveTemp(CompletionCallback));
     TFuture<ResultType> Future = Promise.GetFuture();
    
     switch (Execution)
     {
     case EAsyncExecution::TaskGraphMainThread:
     // fallthrough
     case EAsyncExecution::TaskGraph:
     {
     TGraphTask<TAsyncGraphTask<ResultType>>::CreateTask().ConstructAndDispatchWhenReady(MoveTemp(Function), MoveTemp(Promise), Execution == EAsyncExecution::TaskGraph ? ENamedThreads::AnyThread : ENamedThreads::GameThread);
     }
     break;

     case EAsyncExecution::Thread:
     if (FPlatformProcess::SupportsMultithreading())
     {
     TPromise<FRunnableThread*> ThreadPromise;
     TAsyncRunnable<ResultType>* Runnable = new TAsyncRunnable<ResultType>(MoveTemp(Function), MoveTemp(Promise), ThreadPromise.GetFuture());

     const FString TAsyncThreadName = FString::Printf(TEXT("TAsync %d"), FAsyncThreadIndex::GetNext());
     FRunnableThread* RunnableThread = FRunnableThread::Create(Runnable, *TAsyncThreadName);
    
     check(RunnableThread != nullptr);
     check(RunnableThread->GetThreadType() == FRunnableThread::ThreadType::Real);
    
     ThreadPromise.SetValue(RunnableThread);
     }
     else
     {
     SetPromise(Promise, Function);
     }
     break;
    
     case EAsyncExecution::ThreadIfForkSafe:
     if (FPlatformProcess::SupportsMultithreading() || FForkProcessHelper::IsForkedMultithreadInstance())
     {
     TPromise<FRunnableThread*> ThreadPromise;
     TAsyncRunnable<ResultType>* Runnable = new TAsyncRunnable<ResultType>(MoveTemp(Function), MoveTemp(Promise), ThreadPromise.GetFuture());
    
     const FString TAsyncThreadName = FString::Printf(TEXT("TAsync %d"), FAsyncThreadIndex::GetNext());
     FRunnableThread* RunnableThread = FForkProcessHelper::CreateForkableThread(Runnable, *TAsyncThreadName);
    
     check(RunnableThread != nullptr);
     check(RunnableThread->GetThreadType() == FRunnableThread::ThreadType::Real);
    
     ThreadPromise.SetValue(RunnableThread);
     }
     else
     {
     SetPromise(Promise, Function);
     }
     break;
    
     case EAsyncExecution::ThreadPool:
     if (FPlatformProcess::SupportsMultithreading())
     {
     GThreadPool->AddQueuedWork(new TAsyncQueuedWork<ResultType>(MoveTemp(Function), MoveTemp(Promise)));
     }
     else
     {
     SetPromise(Promise, Function);
     }
     break;
    
    #if WITH_EDITOR
     case EAsyncExecution::LargeThreadPool:
     if (FPlatformProcess::SupportsMultithreading())
     {
     GLargeThreadPool->AddQueuedWork(new TAsyncQueuedWork<ResultType>(MoveTemp(Function), MoveTemp(Promise)));
     }
     else
     {
     SetPromise(Promise, Function);
     }
     break;
    #endif
    
     default:
     check(false); // not implemented yet!
     }
    
     return MoveTemp(Future);
    }

可以看到,这是一个模板函数,其中实现中有两个C++11的关键字:

template <typename T>
void function(T&& t) 
{
  otherdef(forward<T>(t));
}

万能引用

template <typename T>
void function(T&& t) 
{
  otherdef(t);
}

此模板函数的参数 t 既可以接收左值,也可以接收右值,这里在实际使用中,会需要用到一个叫做引用折叠的概念,C++ 11标准为了更好地实现完美转发,特意为模板函数中的参数指定了新的类型匹配规则:

这里给出Async方法使用的一个简单示例代码:

// 使用全局函数
int TestFunc()
{
return 123;
}

TFunction<int()> Task = TestFunc();
auto Result = Async(EAsyncExecution::Thread, Task);

// 使用lambda
TFunction<int()> Task = []()
{
return 123;
}

auto Result = Async(EAsyncExecution::Thread, Task);


// 使用inline lambda
auto Result = Async<int>(EAsyncExecution::Thread, []() {
return 123;
}

此外,值得一提的是,Async方法的亮点在于其返回了一个TFuture<T>值:

  1. 这个值可以获得Lambda返回值,即通过调用Get函数即可得到,不过这种方法虽然可以获得返回值,但是是会造成主线程阻塞,这个问题可以通过在Tick里调用FutureResult.IsReady,等它准备好了再调用Get获取返回值。此外,当没有返回值的时候,它的主线程执行时长是稍差于TaskGraph和AsyncTask的。
  2. 这个值也可以判断Lambda的逻辑有没有执行完
  3. 这个值还支持执行完成的函数回调

基于Async的函数,我们有两个函数变种:

  1. ParallelFor
    这个方法本质是通过TaskGraph创建了多个Task并行执行任务,不过实测发现,这种方式的执行时间非常慢(时间消耗是不使用这种方法的4~5倍),因此如果不是特别复杂的逻辑,不建议使用ParallelFor。

3. 实现原理

3.1 FRunnable方式

这种方式的核心包含三个结构,分别是FRunnable、FRunnableThread以及FThreadManager

FRunnable

这是一个在线程上执行的对象的封装,或者说交付给线程执行的函数体的封装,可以理解成是线程的数据。

在实际使用中,FRunnable会被作为参数传入FRunnableThread,FRunnableThread会在特定时机调用其Run接口。

考虑到硬件不支持多线程的情况,提供了GetSingleThreadInterface接口,在单线程情况下,引擎会通过这个接口返回对象的Tick完成任务驱动。

这个结构包含了如下几个关键的方法:

FRunnableThread

这是线程对象基类,会用来驱动FRunnable。

这个类型是平台无关的线程对象的抽象,不同平台的线程都会继承自他:

这个结构包含了如下的一些方法:

FRunnableThread有若干个子类,其中FRunnableThreadWin是Windows平台的线程;FFakeThread则是伪线程结构,这类任务后续会在主线程中通过Tick进行驱动。

FThreadManager

这是一个单例,凡是通过FRunnableThread创建的线程都是需要通过FThreadManager进行统一管理的,此外内部还维护着一个线程ID到FFakeThread对象的TMap,包含了如下的一些方法:

3.2 AsyncTask方式

这种方式同样包含了若干个关键的类型,下面一一进行介绍。

FQueuedThreadPool

队列线程池对象基类,规范了线程池接口,整套线程池是基于FRunnable和FRunnableThread实现的。

一个问题是,我们为什么需要线程池呢?这是因为线程过多会带来调度开销,进而影响缓存局部性和整体性能,频繁创建和销毁线程也会带来极大的开销,而我们更加关心的是任务可以并发执行,并不想管理线程的创建,销毁和调度。通过将任务处理成队列,交由线程池统一执行,可以提升任务的执行效率。

线程池由若干个Worker线程,和一个同步队列构成:

引擎初始化(FEngineLoop::PreInit)时,如果启用了多线程模式,则会创建下面四种线程池(按顺序):

FQueuedThreadPool的方法可以大致分为两类:

这种类型有一个实现子类是FQueuedThreadPoolBase,下面会有介绍。

IQueuedWork

这是可在线程池中运行的任务基类,同时也是对同步队列执行的任务抽象,值得注意的是,基本上所有的异步任务统一都继承至 IQueuedWork。包含IQueuedWork::DoThreadedWork与IQueuedWork::Abandon,子类实现有FAsyncTask,FAutoDeleteAsyncTask与FAsyncEncode。

FQueuedThread

这是线程池worker线程的实现(是一个FRunnable的实现类),内部包含一个FRunnableThread的实例对象。

提供了工厂方法Create,这个方法会创建一个线程对象并运行,通常和FQueuedThreadPool结合使用。

包含如下的成员变量:

FQueuedThreadPoolBase

这是队列线程池的实现类,父类为FQueuedThreadPool,维护一个IQueuedWork任务队列和FQueuedThread线程列表,工作过程可以描述为:

这个类型包含了如下的一些成员:

除此之外,有几个方法需要关注一下:

FAsyncTask

这是一个模板类,需要将要执行的任务作为模板参数传入,在DoWork接口中完成任务的执行。

基于FQueuedThreadPool完成线程管理,在FAsyncTask::Start中完成QueuedWork到QueuedPool的添加,在此函数中如果传入StartSynchronousTask参数为true,也可以走同步执行。

调用方法有StartBackgroundTask与StartSynchronousTask的区别:

StartSynchronousTask存在的目的是为多线程代码提供灵活性:当我们在使用多线程时发现部分逻辑代码只能跑在主线程或者它跑异步线程其实并没有变快,这个时候想把它改成单线程的时候就很方便。

FAutoDeleteAsyncTask

这个类型与FAsyncTask类似,都是模板类,其父类都是IQueuedWork,任务都是作为模板参数,都是基于FQueuedThreadPool完成线程管理,不同的是,在任务完成后会通过线程池的Destroy函数删除自身或者在执行DoWork后删除自身,而FAsyncTask需要手动delete。

线程池的Task类

线程池的Task类是可以传入FAsyncTask用作具体的任务逻辑对应的类型,这个类型需要实现DoWork接口。这里可以参考FAsyncReleaseFbxScene中的实现,FAsyncReleaseFbxScene继承自FNonAbandonableTask。

继承自FNonAbandonableTask的Task表明任务不可以放弃,必须执行完成,具体而言,这类任务不可以在执行阶段终止,即使中途执行了Abandon函数也会去触发DoWork函数。

这里的一个疑问是,我们为什么要继承FNonAbandonableTask?这是因为当线程池被销毁的时候,会调用Abandon函数,而继承FNonAbandonableTask的话这个时候就不会丢弃而且等待执行完;当然,如果需要丢弃,则就不要继承,并且自己实现CanAbandon和Abandon函数。源码里可丢弃的任务参考:FAsyncStatsFile。

3.3 TaskGraph方式

这种方式有如下的一些类型数据:

TaskGraph的任务类

这类class需要手动定义,无需继承特定接口。需要声明DoTask函数来表示要执行的任务内容,GetDesiredThread函数来表示要在哪个线程上面执行:

下面给出几个任务类的示例:

FGraphEvent

这是任务依赖关系工具类,维护了依赖该Event的所有FBaseGraphTask列表,并用来传递任务完成状态,如果某个任务完成了,就会将其完成的事件传递给下游,这就是FGraphEvent的主要职责,此外,FGraphEventRef是FGraphEvent的指针。

包含如下成员变量:

线程类

FTaskGraphImplementation

这是一个单例,是task graph系统的核心部分,其父类是FTaskGraphInterface(Task Graph System的接口类型)。Task的控制、创建与分配等逻辑都是通过这个单例来完成的。

初始化在FEngineLoop.PreInit里面进行,会默认构建24个FWorkerThread工作线程(这里支持最大的线程数量也就是24):

Runnable线程创建根据类型的不同,会有不同的时机:

FWorkerThread

这个类型并非真正线程,而是对线程对象的封装,包含了如下两个成员变量:

FTaskThreadBase

这个类型用于管理当前任务的FTaskThread,用于执行任务的线程基类(线程可执行对象),继承自FRunnable与FSingleThreadRunnable,子类包含有FTaskThreadAnyThread与FNamedTaskThread。

FTaskThreadAnyThread是无名Task线程执行体,TaskGraph子系统初始化时新创建的工作线程,分为高、中、低(Linux平台Nice值分别为3、5、10)三种优先级。已创建未使用的线程属于此类,放到此线程的任务,执行顺序按照优先顺序逐个进行,优先顺序可以调整,具体的执行顺序可以通过IncomingAnyThreadTasks数组得到。内部维护一个任务队列,提供接口循环执行队列中的任务。

FNamedTaskThread是有名字的Task线程执行体,非TaskGraph子系统内部创建的线程多属于此类,比如GameThread,RenderThread都属于此列。放到此线程的任务,执行顺序按照优先顺序逐个进行,优先顺序不可调整,具体而言是通过FThreadTaskQueue来处理执行顺序。同样内部会维护任务队列,提供接口循环执行队列中的任务。

4. 线程同步

当多个线程共享相同的内存时,需要确保每个线程看到一致的数据视图,当多个线程共享相同的内存时,需要确保每个线程看到一致的数据视图,如果变量是只读的,多个线程同时读取该变量也不会有一致性问题,当某个线程可以修改变量,而其他线程也可以读取或者修改这个变量的时候,就需要对这些线程进行同步,以确保它们在访问变量的存储内容时不会访问到无效的数值。

UE提供了多种同步机制

Atomics 原子机制

这种机制可以用来保证CPU在读取和写入内存时总线操作是不可分割的,即任意线程对变量的一次操作结束后,其他线程读取到的都是最新的(已经写入完成的)数据。基于这个机制,可以进行比较快的进行比较和解锁操作。

class FThreadSafeCounter{
public:
int32 Add( int32 Amount ) {
 return FPlatformAtomics::InterlockedAdd(&Counter, Amount);
 }
private:
 volatile int32 Counter; // 因为值可能以编译器无法预测的异步方式被改变,声明为volatile禁用优化
};

UE4封装了一系列基础的原子操作,window平台下对应的是FWindowsPlatformAtomics类,里面包括读写,加减,与或,以及原子编程的核心方法 compare and swap(CAS),CompareAndSwap是实现无锁数据结构的最基本的操作,UE中的接口为:InterlockedCompareExchange(volatile int64* Dest, int64 Exchange, int64 Comparand),当Dest指向的值和Comparand相等时,将Exchange存到Dest指向的地址,否则什么也不做,这个接口可以用于实现基于循环的自旋锁,可以参考FLockFreePointerFIFO 与FLockFreePointerLIFO 类型中的实现,下面也给出一个示例代码:

while(true)
{
   TDoublePtr Local;
   Local.AtomicRead(Current); // 需要保证读取操作也是原子的,所以用AtomicRead
   TDoublePtr New;
   New.SetPtr(Item); // 新的指针赋值
   if (Current.InterlockedCompareExchange(New, Local))
   {
       break;
   }
}

在Local.AtomicRead(Current)语句中,我们通过原子操作完成Current到Local的赋值。

在Current.InterlockedCompareExchange(New, Local)语句中,尝试将New赋值给Current,但是担心Current在其他线程中被修改,这里通过InterlockedCompareExchange判断Local跟Current是否相等,如果相等就进行赋值,如果不等就继续循环,完成再次赋值,从而排除当前线程被其他线程的干扰。不过这里无法解决ABA的问题,即其他线程改完又改回原值,此时无法检测到,为了解决这个问题,UE4选择了一种比较简单常用的方法,就是给每个需要操作的值加标记,每次操作该值之前都给标记加1,这样CAS操作检测的时候就算值相等,标记不相等也不能返回true。

Locking 锁机制

UE提供了四种不同的锁:

Signaling 信号机制

类型为FSemaphore,是一种信号量与互斥锁类型,这种类型包含了一种信号机制,但不是所有平台都支持。更加常用的线程间通信机制是 FEvent

Waiting

这是一种等待同步机制,包含如下的一些事件类型:

5. 其他

使用的一些tips:

常见的线程安全类有:

6. 参考

[1]. UE4的多线程——无锁LockFree

[2]. UE4异步编程专题 - 线程池FQueuedThreadPool

[3]. 《Exploring in UE4》多线程机制详解[原理分析]

[4]. UE多线程框架

[5]. [原创]UE基础—多线程(一)

[6]. UE4异步操作总结

[7]. UE4异步编程专题 - 多线程

[8]. UE4 C++基础 - 多线程

[9]. 【UE·引擎篇】Runnable、TaskGraph、AsyncTask、Async多线程开发指南

[10]. C++11完美转发及实现方法详解

[11]. 关于网络编程中惊群效应那点事儿

上一篇 下一篇

猜你喜欢

热点阅读