首页 > Personal > Game > 游戏引擎设计系列5-线程、任务、任务队列
2018
09-25

游戏引擎设计系列5-线程、任务、任务队列

引擎为了更方便的使用多线程Thread,我们把所有需要在线程上执行的操作分解成任务Task,并且在Thread上生成一个任务队列TaskQueue,Thread根据TaskQueue的顺序和时间依次执行。需要在Thread上执行的任务,只要调用Thread上的Send和Post就可以将任务插入相应TaskQueue中,其中Send是阻塞的,Post是非阻塞的。
Thread {
static DWORD GetThreadTLSIndex() {
static struct temp {
DWORD index;
temp() { if ((index = TlsAlloc()) == TLS_OUT_OF_INDEXES) ThrowErrorString(“TlsAlloc failed”); }
~temp(){ TlsFree(index); }
} value;
return value.index;
}
/// 获取当前线程
static Thread * Current() {
return (Thread *)TlsGetValue(GetThreadTLSIndex());
}
/// Thread入口函数
static void ThreadEntry(by_ptr(IThread) thread) {
TlsSetValue(GetThreadTLSIndex(), thread.ToPointer());
thread->Ret = thread->Entry(thread->Param);
thread->Running = false;
}
/// 创建线程
static shared_ptr(Thread) CreateThread(U32 StackSize, FN_THREAD_ENTRY entry, void* pParameter, bool Suspended = false);
static void SuspendThread(by_ptr(Thread) thread); // 挂起线程
static void ResumeThread(by_ptr(Thread) thread); // 恢复线程
static void WaitThreadExit(by_ptr(Thread) thread); // 等待线程结束
static void Sleep(U32 uMilliseconds); // 睡眠
static bool Running(by_ptr(Thread) thread = NullPtr); // 是否运行
static void Quit(by_ptr(Thread) thread = NullPtr) // 退出
static temp_ptr(Thread) MainThread(); // 获取主线程
/// 初始化
static void Initialize() {
SThreadManager = new ThreadManager; // 创建线程管理器
SMainThread= ptr_new Thread(::GetCurrentThread(), ::GetCurrentThreadId()); // 创建主线程
SMainThread->LuaThread = Lua::LuaState::NewState(); 创建LuaState
SMainThread->LuaThread->OpenLibs(); 初始化Lua
}
/// 终止
static void Terminate() {
ASSERT(SMainThread.ToPointer() == Current(), “Thread::Terminate must be called from main thread!”);
SThreadManager->WaitAllThreadExit();
SMainThread->LuaThread->Close();
SMainThread->LuaThread = NULL;
if (SMainThread->MessageQueue) {
StopMessageQueue(SMainThread->MessageQueue);
SMainThread->MessageQueue = NullPtr;
}
SMainThread= NullPtr;
TlsSetValue(IThread::GetThreadTLSIndex(), NULL);
SAFE_DELETE(SThreadManager);
}

typedef U32 (*FN_THREAD_ENTRY)(void* pParameter);
FN_THREAD_ENTRY Entry; // 入口函数
void * Param; // 参数
DWORD Ret; // 返回值
shared_ptr(Object) MessageQueue; //消息队列
Lua::LuaState * LuaThread; // lua state
bool Running; // 运行参数
LinkNode *LinkNode; // 链表节点
CriticalSection ThreadLock; // 线程锁
}

static struct ThreadManager * SThreadManager = NULL; // 线程管理器
static shared_ptr(Thread) SMainThread; // 主线程

struct ThreadManager {
CriticalSection Lock;
List AllUserThreads; // 线程数组

/// 创建线程函数
shared_ptr(Thread) CreateThread(U32 stackSize, FN_THREAD_ENTRY entry, void* param, bool suspended) {
CriticalSectionLock lock(Lock);
shared_ptr(Thread) thread = ptr_new Thread(entry, param);
thread->LinkNode = AllUserThreads.PushBack(thread);
thread->Handle = ::CreateThread(NULL, stackSize, &ThreadEntryWrapper, new shared_ptr(Thread)(thread), suspended? CREATE_SUSPENDED : 0 , &thread->Id);
return thread;
}

/// 等待所有线程退出
void WaitAllThreadExit() {
Lock.Enter();
while (!AllUserThreads.Empty()){
shared_ptr(Thread) thread(AllUserThreads.Front()->data);
if (thread) {
Lock.Leave();
Thread::Quit(thread);
Thread::WaitThreadExit(thread);
Lock.Enter();
}
}
Lock.Leave();
}

/// 所有线程入口
static DWORD __stdcall ThreadEntryWrapper(void * param) {
shared_ptr(Thread) thread = *static_cast(param); delete static_cast(param);
Thread::ThreadEntryWrapper(thread);
SThreadManager->Lock.Enter();
SThreadManager->AllUserThreads.Remove(thread->LinkNode);
SThreadManager->Lock.Leave();
if (thread->MessageQueue) StopMessageQueue(thread->MessageQueue);
return thread->Ret;
}
};

class Task {
void OnDispatch(IArguments & args) // 调度事件函数

static void Send(by_ptr(Task) task, by_ptr(IThread) thread = NullPtr, IArguments & args = FixedArguments<0>());
static void Post(by_ptr(Task) task, by_ptr(IThread) thread = NullPtr);
static void Schedule(by_ptr(Task) task, F64 time = 0, by_ptr(IThread) thread = NullPtr);
static shared_ptr(Task) Receive();
static temp_ptr(Task) Peek();
static shared_ptr(Task) Current();
static void Dispatch(by_ptr(Task) task, IArguments & args = FixedArguments<0>());
static void Cancel(by_ptr(Task) task);
static void NextFrame();
static void Wait(by_ptr(Task) task);
static F32 GetFrameTime(by_ptr(IThread) thread = NullPtr);
static F64 GetTotalTime(by_ptr(IThread) thread = NullPtr);

LinkList> WaitingThreadList; // 等待信号的链表
F64 ExecuteTime; // 在什么时间执行
};

class TaskQueue {
static temp_ptr(TaskQueue) FromThread(temp_ptr(Thread) thread) // 从线程获取任务队列
static void CancelTask(by_ptr(Task) task) // 把task从链表中删除

void Wait(by_ptr(Task) task) // 等待task执行完成,会把WaitingNode加入task的WaitingThreadList,task执行时会设置信号
void Send(by_ptr(Task) task, IArguments & args) // 如果是当前线程调用DispatchTask,如果不是调用Post(task);Wait(Task);
void Post(by_ptr(Task) task) // 将task插入到DelayHeadNode之前HeadNode之后,同时设置task的ExecuteTime为当前DispatchTime
void Schedule(by_ptr(Task) task, F64 time) // 设置task的ExecuteTime为当前时间+time,根据时间在DelayTimeNode之后查找到task相应的位置并插入
void DispatchTask(by_ptr(Task) task, IArguments & args) // 执行task,并调用task的OnDispatch,只在对应Thread上调用
shared_ptr(Task) Receive() // HeadNode后有task直接返回,如果没有,需要执行Windows消息就循环执行PeekMessage、TranslateMessage、DispatchMessage,如果DelayHeadNode后面有延时任务,调用NextFrame,如果DelayTimeNode有任务,如果执行时间大于一定时间间隔Sleep一段时间然后调用NextFrame,否则直接调用NextFrame
void Stop() // 终止所有任务
void NextFrame() // 查找DelayTimeNode后所有执行时间小于当前时间的task,把他们插到DelayHeadNode之前
F32 GetFrameTime() // 获取Clock的FrameTime
F64 GetTotalTime() // 获取Clock的TotalTime

temp_ptr(Thread) Thread;
F64 DispatchTime;
TaskNode *HeadNode; // 在链表头
TaskNode *DelayHeadNode; // 初始化在HeadNode之后,
TaskNode *DelayTimeNode; // 初始化在DelayHeadNode之后
CriticalSection Lock;
LinkNode WaitingNode; // 等待信号
bool ProcessWindowsMessages; // 是否需要执行windows消息
}

/// 通过GenericTask功能类生成task, 根据参数个数的不同,使用M4推导模板
template class GenericTask : public Task{};
template<> class GenericTask : public Task {
typedef void (*FUNC)();
inline GenericTask(FUNC func): Function(func) {}
inline virtual void OnDispatch(IArguments & args) { (Function)(); }
FUNC Function;
};
template inline SharedPtr > NewGenericTask(FUNC func) {
return ptr_new GenericTask(func);
};

对于多线程的管理,其实最重要的还是数据同步的问题,多个线程对同一个资源的共同操作,这个是很难处理的问题.几个解决思路.
把对同一个资源的操作分配到同一个线程中去,比如像我们的线程系统可以扔相应的Task到指定的线程,设计时就需要注意将资源按线程分离.可能是最简单的实现方法.不过并不能实现完全的并发.
对需要操作的资源进行管理,如果只读请求直接返回(相当于复制数据),如果是写操作或者读写操作,需要阻塞资源的请求(比如类似memcached内存缓存,带着数据版本号cas请求数据写操作,cas一致才可以修改,否则失败,类似原子置换操作),这样相当于把资源的操作分离出,操作线程不需要关心实际的资源情况,可以实现并发,不过可能有些操作会阻塞,相对更智能一些.(如类似Go的channel)
但总的来说,资源的访问冲突不可避免,只能尽量想办法解决,可能更好的是从设计上想办法避免类似问题吧(比如简单数据的操作原子化等等),但你还是不能限制实际的游戏逻辑不是顺序相关的.如果为了并行,带来了实际开发的复杂度和限制,我觉得反而得不偿失了.
至少做到大的功能模块可以按功能多线程,比如动画,物理,声音,渲染,网络等,就已经可以很大程度享受多线程的优势了.这也是现在主要的商业引擎Unity Unreal的设计方式.

最后编辑:
作者:wy182000
这个作者貌似有点懒,什么都没有留下。