CMCore/inc/hgl/thread/Workflow.h

372 lines
11 KiB
C
Raw Normal View History

2019-08-19 19:19:58 +08:00
#ifndef HGL_WORKFLOW_INCLUDE
#define HGL_WORKFLOW_INCLUDE
#include<hgl/thread/Thread.h>
#include<hgl/thread/ThreadMutex.h>
#include<hgl/thread/SwapData.h>
#include<hgl/thread/DataPost.h>
#include<hgl/type/List.h>
namespace hgl
{
/**
* <br>
* 线线<br>
* 线<br>
*
*/
namespace workflow
{
/**
*
* @param W
*/
template<typename W> class WorkProc
{
public:
virtual ~WorkProc()=default;
public: //投递工作线程所需调用的方法
virtual void Post(W *w)=0; ///<投递一个工作
virtual void Post(W **w,int count)=0; ///<投递一批工作
public: //需用户重载实现的真正执行工作的方法
/**
*
*/
virtual void OnWork(const uint,W *)=0;
public: //由工作线程调用的执行工作事件函数
/**
*
*/
virtual bool OnExecuteWork(const uint)=0;
};//template<typename W> class WorkProc
/**
* <br>
* 线线
*/
template<typename W> class SingleWorkProc:public WorkProc<W>
{
public:
using WorkList=List<W *>;
private:
SemSwapData<WorkList> work_list; ///<工程列表
protected:
double time_out;
public:
SingleWorkProc()
{
time_out=5;
}
virtual ~SingleWorkProc()=default;
void SetTimeOut(const double to) ///<设置超时时间
{
if(to<=0)time_out=0;
else time_out=to;
}
virtual void Post(W *w) override ///<投递一个工作
{
WorkList &wl=work_list.GetPost();
wl.Add(w);
work_list.ReleasePost();
}
virtual void Post(W **w,int count) override ///<投递一批工作
{
WorkList &wl=work_list.GetPost();
wl.Add(w,count);
work_list.ReleasePost();
}
virtual void ToWork() ///<将堆积的工作列表发送给工作线程
{
work_list.PostSem(1);
}
public:
/**
* 使
*/
virtual void OnFinish(const uint wt_index)
{
}
/**
*
*/
virtual bool OnExecuteWork(const uint wt_index) override
{
//为什么不使用TrySemSwap使用TrySemSwap固然会立即返回结果但会引起线程频繁刷新造成CPU的流费。
//使用WaitSemSwap目前唯一坏处是在退出时需要等待超时时间。
if(!work_list.WaitSemSwap(time_out))
return(false);
WorkList &wl=work_list.GetReceive();
const int count=wl.GetCount();
if(count>0)
{
W **p=wl.GetData();
for(int i=0;i<count;i++)
{
this->OnWork(wt_index,*p);
++p;
}
this->OnFinish(wt_index);
wl.ClearData();
}
return(true);
}
};//template<typename W> class SingleWorkProc:public WorkProc<W>
/**
* <br>
* 线线
*/
template<typename W> class MultiWorkProc:public WorkProc<W>
{
protected:
SemDataPost<W> work_list; ///<工程列表
protected:
double time_out;
public:
MultiWorkProc()
{
time_out=5;
}
virtual ~MultiWorkProc()=default;
void SetTimeOut(const double to) ///<设置超时时间
{
if(to<=0)time_out=0;
else time_out=to;
}
virtual void Post(W *w) override ///<投递一个工作
{
if(!w)return;
work_list.Post(w);
work_list.PostSem(1);
}
virtual void Post(W **w,int count) override ///<投递一批工作
{
if(!w||count<=0)return;
work_list.Post(w,count);
work_list.PostSem(count);
}
public:
/**
*
*/
virtual bool OnExecuteWork(const uint wt_index) override
{
//为什么不使用TrySemReceive使用TrySemReceive固然会立即返回结果但会引起线程频繁刷新造成CPU的流费。
//使用WaitSemReceive目前唯一坏处是在退出时需要等待超时时间。
W *obj=work_list.WaitSemReceive(time_out);
if(!obj)
return(false);
this->OnWork(wt_index,obj);
return(true);
}
};//template<typename W> class MultiWorkProc:public WorkProc<W>
/**
* 线
*/
template<typename W> class WorkThread:public Thread
{
protected:
using WorkList=List<W *>;
WorkProc<W> *work_proc;
uint work_thread_index;
bool force_close;
public:
WorkThread(WorkProc<W> *wp)
{
work_proc=wp;
work_thread_index=0;
force_close=false;
}
#ifndef _DEBUG
virtual ~WorkThread()=default;
#else
virtual ~WorkThread()
{
LOG_INFO(U8_TEXT("WorkThread Destruct [")+thread_addr_string+U8_TEXT("]"));
}
#endif//_DEBUG
bool IsExitDelete()const override{return false;} ///<返回在退出线程时,不删除本对象
void SetWorkThreadIndex(const uint index)
{
work_thread_index=index;
}
void ExitWork(const bool fc)
{
force_close=fc;
Thread::WaitExit();
}
virtual void ProcEndThread() override
{
if(!force_close) //不是强退
while(work_proc->OnExecuteWork(work_thread_index)); //把工作全部做完
#ifdef _DEBUG
{
LOG_INFO(U8_TEXT("WorkThread Finish [")+thread_addr_string+U8_TEXT("]"));
}
#endif//_DEBUG
}
virtual bool Execute() override
{
if(!work_proc)
RETURN_FALSE;
work_proc->OnExecuteWork(work_thread_index);
return(true);
}
};//template<typename W> class WorkThread:public Thread
/**
* <br>
* 线<br>
* 线线便
*/
template<typename WP,typename WT> class WorkGroup
{
ObjectList<WP> wp_list; ///<投递器列表
ObjectList<WT> wt_list; ///<工作线程列表
bool run=false;
public:
virtual ~WorkGroup()
{
Close();
}
virtual bool Add(WP *wp)
{
if(!wp)return(false);
wp_list.Add(wp);
return(true);
}
virtual bool Add(WP **wp,const int count)
{
if(!wp)return(false);
wp_list.Add(wp,count);
return(true);
}
virtual bool Add(WT *wt)
{
if(!wt)return(false);
int index=wt_list.Add(wt);
wt->SetWorkThreadIndex(index);
return(true);
}
virtual bool Add(WT **wt,const int count)
{
if(!wt)return(false);
int index=wt_list.Add(wt,count);
for(int i=0;i<count;i++)
{
(*wt)->SetWorkThreadIndex(index);
++index;
++wt;
}
return(true);
}
virtual bool Start()
{
int count=wt_list.GetCount();
if(count<=0)
RETURN_FALSE;
WT **wt=wt_list.GetData();
for(int i=0;i<count;i++)
wt[i]->Start();
run=true;
return(true);
}
virtual void Close(bool force_close=false)
{
if(!run)return;
int count=wt_list.GetCount();
WT **wt=wt_list.GetData();
for(int i=0;i<count;i++)
wt[i]->ExitWork(force_close);
run=false;
}
};//template<typename WP,typename WT> class WorkGroup
}//namespace workflow
}//namespace hgl
#endif//HGL_WORKFLOW_INCLUDE